|
|
|
@@ -125,13 +125,10 @@ MSRStatus ShardReader::Open() { |
|
|
|
|
|
|
|
for (const auto &file : file_paths_) { |
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return FAILED; |
|
|
|
} |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary); |
|
|
|
if (!fs->good()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return FAILED; |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "Open shard file successfully."; |
|
|
|
file_streams_.push_back(fs); |
|
|
|
@@ -146,13 +143,10 @@ MSRStatus ShardReader::Open(int n_consumer) { |
|
|
|
for (const auto &file : file_paths_) { |
|
|
|
for (int j = 0; j < n_consumer; ++j) { |
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return FAILED; |
|
|
|
} |
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary); |
|
|
|
if (!fs->good()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return FAILED; |
|
|
|
} |
|
|
|
file_streams_random_[j].push_back(fs); |
|
|
|
} |
|
|
|
@@ -311,12 +305,10 @@ MSRStatus ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql, |
|
|
|
std::string file_name = file_paths_[shard_id]; |
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); |
|
|
|
if (!all_in_index_) { |
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
} |
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary); |
|
|
|
if (!fs->good()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return FAILED; |
|
|
|
} |
|
|
|
} |
|
|
|
sqlite3_free(errmsg); |
|
|
|
@@ -520,8 +512,8 @@ std::pair<MSRStatus, std::vector<json>> ShardReader::GetLabelsFromBinaryFile( |
|
|
|
std::string file_name = file_paths_[shard_id]; |
|
|
|
std::vector<json> res; |
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); |
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::binary); |
|
|
|
if (fs->fail()) { |
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary); |
|
|
|
if (!fs->good()) { |
|
|
|
MS_LOG(ERROR) << "File could not opened"; |
|
|
|
return {FAILED, {}}; |
|
|
|
} |
|
|
|
|