Browse Source

!9685 Fix a core dump in TreeConsumer::Terminate() plus minor cache fixes

From: @lixiachen
Reviewed-by: @mikef,@nsyca
Signed-off-by: @nsyca
tags/v1.1.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
a2c80435ce
6 changed files with 104 additions and 24 deletions
  1. +11
    -19
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc
  2. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc
  3. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc
  4. +4
    -1
      mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc
  5. +82
    -1
      tests/ut/cpp/dataset/c_api_cache_test.cc
  6. +4
    -0
      tests/ut/python/dataset/test_cache_map.py

+ 11
- 19
mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc View File

@@ -355,7 +355,7 @@ Status CacheAdminArgHandler::RunCommand() {
// The server will send a message back and remove the queue and we will then wake up. But on the safe // The server will send a message back and remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this process if we hang on // side, we will also set up an alarm and kill this process if we hang on
// the message queue. // the message queue.
alarm(30);
alarm(60);
Status dummy_rc; Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc); (void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl; std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
@@ -533,24 +533,16 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {


void CacheAdminArgHandler::Help() { void CacheAdminArgHandler::Help() {
std::cerr << "Syntax:\n"; std::cerr << "Syntax:\n";
std::cerr << " cache_admin [--start | --stop]\n";
std::cerr << " [ [-h | --hostname] <hostname> ]\n";
std::cerr << " Default is " << kCfgDefaultCacheHost << ".\n";
std::cerr << " [ [-p | --port] <port number> ]\n";
std::cerr << " Possible values are in range [1025..65535].\n";
std::cerr << " Default is " << kCfgDefaultCachePort << ".\n";
std::cerr << " [ [-g | --generate_session] ]\n";
std::cerr << " [ [-d | --destroy_session] <session id> ]\n";
std::cerr << " [ [-w | --workers] <number of workers> ]\n";
std::cerr << " Possible values are in range [1...max(100, Number of CPU)].\n";
std::cerr << " Default is " << kDefaultNumWorkers << ".\n";
std::cerr << " [ [-s | --spilldir] <spilling directory> ]\n";
std::cerr << " Default is " << DefaultSpillDir() << ".\n";
std::cerr << " [ [-l | --loglevel] <log level> ]\n";
std::cerr << " Possible values are 0, 1, 2 and 3.\n";
std::cerr << " Default is 1 (info level).\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [--help]" << std::endl;
std::cerr << "cache_admin [--start | --stop]\n";
std::cerr << " [[-h | --hostname] <hostname>] Default is " << kCfgDefaultCacheHost << ".\n";
std::cerr << " [[-p | --port] <port number>] Default is " << kCfgDefaultCachePort << ".\n";
std::cerr << " [[-w | --workers] <number of workers>] Default is " << kDefaultNumWorkers << ".\n";
std::cerr << " [[-s | --spilldir] <spilling directory>] Default is " << DefaultSpillDir() << ".\n";
std::cerr << " [[-l | --loglevel] <log level>] Default is 1 (warning level).\n";
std::cerr << " [--destroy_session | -d] <session id>\n";
std::cerr << " [--generate_session | -g]\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [--help]" << std::endl;
// Do not expose these option to the user via help or documentation, but the options do exist to aid with // Do not expose these option to the user via help or documentation, but the options do exist to aid with
// development and tuning. // development and tuning.
// [ [-m | --shared_memory_size] <shared memory size> ] // [ [-m | --shared_memory_size] <shared memory size> ]


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc View File

@@ -119,8 +119,8 @@ ds::Status StartServer(int argc, char **argv) {
} }


// Dump the summary // Dump the summary
MS_LOG(INFO) << "Logging services started with log level: " << argv[5];
MS_LOG(INFO) << builder << std::endl;
MS_LOG(WARNING) << "Logging services started with log level: " << argv[5];
MS_LOG(WARNING) << builder << std::endl;
// Create the instance with some sanity checks built in // Create the instance with some sanity checks built in
rc = builder.Build(); rc = builder.Build();
if (rc.IsOk()) { if (rc.IsOk()) {


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc View File

@@ -635,7 +635,7 @@ Status CacheServer::GetCacheMissKeys(CacheRequest *rq, CacheReply *reply) {


inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) { inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) {
reply->set_result(std::to_string(session_id)); reply->set_result(std::to_string(session_id));
MS_LOG(INFO) << "Server generated new session id " << session_id;
MS_LOG(WARNING) << "Server generated new session id " << session_id;
return Status::OK(); return Status::OK();
} }




+ 4
- 1
mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc View File

@@ -36,7 +36,10 @@ namespace dataset {
TreeConsumer::TreeConsumer() { tree_adapter_ = std::make_unique<TreeAdapter>(); } TreeConsumer::TreeConsumer() { tree_adapter_ = std::make_unique<TreeAdapter>(); }


Status TreeConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d)); } Status TreeConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d)); }
Status TreeConsumer::Terminate() { return tree_adapter_->AllTasks()->ServiceStop(); }
Status TreeConsumer::Terminate() {
CHECK_FAIL_RETURN_UNEXPECTED(tree_adapter_->AllTasks() != nullptr, " Execution tree has not been built");
return tree_adapter_->AllTasks()->ServiceStop();
}


// IteratorConsumer // IteratorConsumer
Status IteratorConsumer::Init(std::shared_ptr<DatasetNode> d) { Status IteratorConsumer::Init(std::shared_ptr<DatasetNode> d) {


+ 82
- 1
tests/ut/cpp/dataset/c_api_cache_test.cc View File

@@ -15,6 +15,7 @@
*/ */
#include "common/common.h" #include "common/common.h"
#include "minddata/dataset/include/datasets.h" #include "minddata/dataset/include/datasets.h"
#include "minddata/dataset/include/vision.h"


#include "minddata/dataset/engine/ir/datasetops/source/csv_node.h" #include "minddata/dataset/engine/ir/datasetops/source/csv_node.h"


@@ -51,6 +52,34 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiSamplerNull) {
EXPECT_EQ(iter, nullptr); EXPECT_EQ(iter, nullptr);
} }


TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiNestedCache) {
session_id_type env_session;
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());

std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
EXPECT_NE(some_cache, nullptr);

// Create an ImageFolder Dataset, this folder_path only has 2 images in it
std::string folder_path = datasets_root_path_ + "/testImageNetData/train/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, false, RandomSampler(), {}, {}, some_cache);
EXPECT_NE(ds, nullptr);

// Create objects for the tensor ops
std::shared_ptr<TensorOperation> decode_op = vision::Decode();
EXPECT_NE(decode_op, nullptr);

// Create a Map operation on ds
ds = ds->Map({decode_op}, {}, {}, {"image"}, some_cache);
EXPECT_NE(ds, nullptr);

// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
// Now in the cache_error_pass would fail and we would end up with a nullptr iter.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_EQ(iter, nullptr);
}

TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) { TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) {
session_id_type env_session; session_id_type env_session;
Status s = GetSessionFromEnv(&env_session); Status s = GetSessionFromEnv(&env_session);
@@ -736,7 +765,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) {
iter->Stop(); iter->Stop();
} }


TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare) {
TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare1) {
session_id_type env_session; session_id_type env_session;
Status s = GetSessionFromEnv(&env_session); Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK()); EXPECT_EQ(s, Status::OK());
@@ -788,6 +817,58 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare) {
iter2->Stop(); iter2->Stop();
} }


TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare2) {
session_id_type env_session;
Status s = GetSessionFromEnv(&env_session);
EXPECT_EQ(s, Status::OK());

std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true);
EXPECT_NE(some_cache, nullptr);

// Create an ImageFolder Dataset, this folder_path only has 2 images in it
std::string folder_path = datasets_root_path_ + "/testImageNetData/train/";
// The first pipeline is ImageFolder with RandomSampler, the second pipeline is ImageFolder with SequentialSampler
// Since sampler does not influence the data in the source, these two pipelines can share a common cache.
std::shared_ptr<Dataset> ds1 = ImageFolder(folder_path, true, RandomSampler(), {}, {}, some_cache);
EXPECT_NE(ds1, nullptr);
std::shared_ptr<Dataset> ds2 = ImageFolder(folder_path, true, SequentialSampler(), {}, {}, some_cache);
EXPECT_NE(ds2, nullptr);

// Create and launch the Execution Tree for ds1
std::shared_ptr<Iterator> iter1 = ds1->CreateIterator();
EXPECT_NE(iter1, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter1->GetNextRow(&row);

uint64_t i = 0;
while (row.size() != 0) {
i++;
auto image = row["image"];
iter1->GetNextRow(&row);
}
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter1->Stop();

// Create and launch the Execution Tree for ds2
std::shared_ptr<Iterator> iter2 = ds2->CreateIterator();
EXPECT_NE(iter2, nullptr);
// Iterate the dataset and get each row
iter2->GetNextRow(&row);

i = 0;
while (row.size() != 0) {
i++;
auto image = row["image"];
iter2->GetNextRow(&row);
}
EXPECT_EQ(i, 2);

// Manually terminate the pipeline
iter2->Stop();
}

TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) { TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) {
session_id_type env_session; session_id_type env_session;
Status s = GetSessionFromEnv(&env_session); Status s = GetSessionFromEnv(&env_session);


+ 4
- 0
tests/ut/python/dataset/test_cache_map.py View File

@@ -233,6 +233,10 @@ def test_cache_map_failure1():
ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
ds1 = ds1.repeat(4) ds1 = ds1.repeat(4)


with pytest.raises(RuntimeError) as e:
ds1.get_batch_size()
assert "Nested cache operations" in str(e.value)

with pytest.raises(RuntimeError) as e: with pytest.raises(RuntimeError) as e:
num_iter = 0 num_iter = 0
for _ in ds1.create_dict_iterator(num_epochs=1): for _ in ds1.create_dict_iterator(num_epochs=1):


Loading…
Cancel
Save