Merge pull request !3041 from JesseKLee/pr2891_commentstags/v0.6.0-beta
| @@ -105,7 +105,7 @@ Status CacheService::CacheRow(const std::vector<const void *> &buf, row_id_type | |||||
| RETURN_IF_NOT_OK(cp_->Insert(all_data, &key)); | RETURN_IF_NOT_OK(cp_->Insert(all_data, &key)); | ||||
| Status rc = map_->DoInsert(*row_id_generated, key); | Status rc = map_->DoInsert(*row_id_generated, key); | ||||
| if (rc == Status(StatusCode::kDuplicateKey)) { | if (rc == Status(StatusCode::kDuplicateKey)) { | ||||
| MS_LOG(DEBUG) << "Ignoring duplicate key"; | |||||
| MS_LOG(DEBUG) << "Ignoring duplicate key."; | |||||
| } else { | } else { | ||||
| RETURN_IF_NOT_OK(rc); | RETURN_IF_NOT_OK(rc); | ||||
| } | } | ||||
| @@ -53,7 +53,7 @@ CacheBase::CacheBase(int32_t num_workers, int32_t op_connector_size, int32_t row | |||||
| cache_client_(cache_client), | cache_client_(cache_client), | ||||
| rows_per_buffer_(rows_per_buf), | rows_per_buffer_(rows_per_buf), | ||||
| // We can cause deadlock if this internal Connector size is too small. | // We can cause deadlock if this internal Connector size is too small. | ||||
| keys_miss_(num_workers_, 1, 1024) { | |||||
| keys_miss_(num_workers_, 1, connector_capacity_) { | |||||
| io_block_queues_.Init(num_workers, op_connector_size); | io_block_queues_.Init(num_workers, op_connector_size); | ||||
| } | } | ||||
| // Common function to fetch samples from the sampler and send them using the io_block_queues to | // Common function to fetch samples from the sampler and send them using the io_block_queues to | ||||
| @@ -48,8 +48,6 @@ class CacheBase : public ParallelOp { | |||||
| /// \brief Destructor | /// \brief Destructor | ||||
| ~CacheBase(); | ~CacheBase(); | ||||
| constexpr static int eoe_row_id = -1; | |||||
| /// \brief Overrides base class reset method. When an operator does a reset, it cleans up any state | /// \brief Overrides base class reset method. When an operator does a reset, it cleans up any state | ||||
| /// info from it's previous execution and then initializes itself so that it can be executed | /// info from it's previous execution and then initializes itself so that it can be executed | ||||
| /// again. | /// again. | ||||
| @@ -80,6 +78,7 @@ class CacheBase : public ParallelOp { | |||||
| virtual bool AllowCacheMiss() = 0; | virtual bool AllowCacheMiss() = 0; | ||||
| protected: | protected: | ||||
| constexpr static int32_t eoe_row_id = -1; | |||||
| std::shared_ptr<CacheClient> cache_client_; | std::shared_ptr<CacheClient> cache_client_; | ||||
| WaitPost epoch_sync_; | WaitPost epoch_sync_; | ||||
| int32_t rows_per_buffer_; | int32_t rows_per_buffer_; | ||||
| @@ -100,6 +99,7 @@ class CacheBase : public ParallelOp { | |||||
| Status UpdateColumnMapFromCache(); | Status UpdateColumnMapFromCache(); | ||||
| private: | private: | ||||
| constexpr static int32_t connector_capacity_ = 1024; | |||||
| QueueList<std::unique_ptr<IOBlock>> io_block_queues_; | QueueList<std::unique_ptr<IOBlock>> io_block_queues_; | ||||
| }; | }; | ||||
| } // namespace dataset | } // namespace dataset | ||||
| @@ -13,6 +13,7 @@ | |||||
| * See the License for the specific language governing permissions and | * See the License for the specific language governing permissions and | ||||
| * limitations under the License. | * limitations under the License. | ||||
| */ | */ | ||||
| #include "dataset/engine/datasetops/cache_merge_op.h" | |||||
| #include <algorithm> | #include <algorithm> | ||||
| #include <functional> | #include <functional> | ||||
| @@ -20,7 +21,6 @@ | |||||
| #include "dataset/core/config_manager.h" | #include "dataset/core/config_manager.h" | ||||
| #include "dataset/core/constants.h" | #include "dataset/core/constants.h" | ||||
| #include "dataset/core/global_context.h" | #include "dataset/core/global_context.h" | ||||
| #include "dataset/engine/datasetops/cache_merge_op.h" | |||||
| #include "dataset/engine/opt/pass.h" | #include "dataset/engine/opt/pass.h" | ||||
| #include "dataset/engine/execution_tree.h" | #include "dataset/engine/execution_tree.h" | ||||
| #include "dataset/util/task_manager.h" | #include "dataset/util/task_manager.h" | ||||
| @@ -50,7 +50,8 @@ Status CacheMergeOp::operator()() { | |||||
| // A queue of row id to let cleaner send cache miss rows to the cache server | // A queue of row id to let cleaner send cache miss rows to the cache server | ||||
| // We don't want a small queue as this will block the parallel op workers. | // We don't want a small queue as this will block the parallel op workers. | ||||
| // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory. | // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory. | ||||
| io_que_ = std::make_unique<Queue<row_id_type>>(512); | |||||
| static const int32_t queue_sz = 512; | |||||
| io_que_ = std::make_unique<Queue<row_id_type>>(queue_sz); | |||||
| RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks())); | RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks())); | ||||
| RETURN_IF_NOT_OK( | RETURN_IF_NOT_OK( | ||||
| tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1))); | tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1))); | ||||
| @@ -151,7 +152,7 @@ Status CacheMergeOp::Cleaner() { | |||||
| } | } | ||||
| TensorRow row; | TensorRow row; | ||||
| RETURN_IF_NOT_OK(rq->Release(&row)); | RETURN_IF_NOT_OK(rq->Release(&row)); | ||||
| CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error"); | |||||
| CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error."); | |||||
| Status rc = cache_client_->WriteRow(row); | Status rc = cache_client_->WriteRow(row); | ||||
| // Bad rc should not bring down the pipeline | // Bad rc should not bring down the pipeline | ||||
| if (rc.IsError()) { | if (rc.IsError()) { | ||||