| @@ -102,8 +102,10 @@ class Connector { | |||
| RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; })); | |||
| RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result)); | |||
| pop_from_ = (pop_from_ + 1) % num_producers_; | |||
| out_buffers_count_++; | |||
| expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; | |||
| } | |||
| cv_.NotifyAll(); | |||
| return Status::OK(); | |||
| } | |||
| @@ -119,6 +121,8 @@ class Connector { | |||
| return (queues_[worker_id]->Add(el)); | |||
| } | |||
| auto out_buffers_count() const { return out_buffers_count_.load(); } | |||
| // Add an element into the DbConnector without the overhead of synchronization. | |||
| // It may block when the internal queue is full. | |||
| // The element passed to this function will be forwarded into the internal queue. | |||
| @@ -138,6 +142,7 @@ class Connector { | |||
| } | |||
| expect_consumer_ = 0; | |||
| pop_from_ = 0; | |||
| out_buffers_count_ = 0; | |||
| MS_LOG(DEBUG) << "Connector counters reset."; | |||
| } | |||
| @@ -198,6 +203,7 @@ class Connector { | |||
| // Used in the Pop(), when a thread call pop() but it is not the expect_consumer_. | |||
| std::mutex m_; | |||
| CondVar cv_; | |||
| std::atomic<std::int64_t> out_buffers_count_ = 0; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -222,6 +222,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> { | |||
| // Getter function | |||
| // @return connector size of current op | |||
| int32_t ConnectorSize() const { | |||
| if (!inlined()) { | |||
| return out_connector_->size(); | |||
| @@ -230,6 +231,10 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> { | |||
| return ChildOpConnectorSize(); | |||
| } | |||
| int64_t ConnectorOutBufferCount() const { | |||
| return out_connector_ == nullptr ? int64_t(-1) : static_cast<int64_t>(out_connector_->out_buffers_count()); | |||
| } | |||
| // Getter function | |||
| // @return connector size of current op | |||
| int32_t ConnectorCapacity() const { | |||
| @@ -83,6 +83,7 @@ class DbConnector : public Connector<std::unique_ptr<DataBuffer>> { | |||
| expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; | |||
| } | |||
| } | |||
| out_buffers_count_++; | |||
| cv_.NotifyAll(); | |||
| return Status::OK(); | |||
| } | |||
| @@ -88,8 +88,10 @@ class ExecutionTree { | |||
| bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; } | |||
| int32_t NumNodes() { return nodes_.size(); } | |||
| private: | |||
| int ind_; // the cur node our Iterator points to | |||
| int32_t ind_; // the cur node our Iterator points to | |||
| std::vector<std::shared_ptr<DatasetOp>> nodes_; // store the nodes in post order | |||
| void PostOrderTraverse(const std::shared_ptr<DatasetOp> &); | |||
| }; | |||
| @@ -3,4 +3,6 @@ add_library(engine-perf OBJECT | |||
| monitor.cc | |||
| device_queue_tracing.cc | |||
| connector_size.cc | |||
| dataset_iterator_tracing.cc) | |||
| dataset_iterator_tracing.cc | |||
| connector_throughput.cc | |||
| ) | |||
| @@ -14,7 +14,6 @@ | |||
| * limitations under the License. | |||
| */ | |||
| #include "dataset/engine/perf/connector_size.h" | |||
| #include <algorithm> | |||
| #include <fstream> | |||
| #include <memory> | |||
| @@ -13,8 +13,8 @@ | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_QUEUE_DEPTH_H | |||
| #define MINDSPORE_QUEUE_DEPTH_H | |||
| #ifndef DATASET_CONNECTOR_SIZE_H | |||
| #define DATASET_CONNECTOR_SIZE_H | |||
| #include <string> | |||
| #include <vector> | |||
| @@ -50,7 +50,7 @@ class ConnectorSize : public Sampling { | |||
| // This function samples the connector size of every nodes within the ExecutionTree | |||
| Status Sample() override; | |||
| std::string Name() const override { return kDeviceQueueTracingName; }; | |||
| std::string Name() const override { return kConnectorSizeSamplingName; } | |||
| // Save sampling data to file | |||
| // @return Status - The error code return | |||
| @@ -65,6 +65,8 @@ class ConnectorSize : public Sampling { | |||
| ExecutionTree *tree_ = nullptr; // ExecutionTree pointer | |||
| ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_QUEUE_DEPTH_H | |||
| #endif // DATASET_CONNECTOR_SIZE_H | |||
| @@ -0,0 +1,109 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <fstream> | |||
| #include <iterator> | |||
| #include <algorithm> | |||
| #include <memory> | |||
| #include <string> | |||
| #include <nlohmann/json.hpp> | |||
| #include "dataset/engine/perf/connector_throughput.h" | |||
| #include "dataset/engine/execution_tree.h" | |||
| #include "dataset/util/path.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| // temporary helper | |||
| int ConnectorThroughput::InitNodes() { | |||
| auto it = (*tree_).begin(); | |||
| return it.NumNodes(); | |||
| } | |||
| // Sample action | |||
| Status ConnectorThroughput::Sample() { | |||
| std::vector<int64_t> out_buffer_count_row(n_nodes_); | |||
| std::vector<double> throughput_row(n_nodes_); | |||
| TimePoint cur_time; // initialised inside the loop, used outside the loop to update prev sample time. | |||
| auto col = 0; | |||
| for (const auto &node : *tree_) { | |||
| auto cur_out_buffer_count = node.ConnectorOutBufferCount(); | |||
| out_buffer_count_row[col] = cur_out_buffer_count; | |||
| auto sz = timestamps_.size(); | |||
| cur_time = std::chrono::steady_clock::now(); | |||
| auto _dt = std::chrono::duration_cast<std::chrono::microseconds>(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]); | |||
| auto dt = std::chrono::duration<double>(_dt).count(); | |||
| auto prev_out_buffer_count = out_buffer_count_table_[col][out_buffer_count_table_.size() - 1]; | |||
| if (dt != 0) { | |||
| auto thr = (cur_out_buffer_count - prev_out_buffer_count) / (1000 * dt); | |||
| throughput_row[col] = thr; | |||
| } else { | |||
| throughput_row[col] = -1; | |||
| } | |||
| col++; | |||
| } | |||
| std::vector<TimePoint> v = {cur_time}; // temporary fix | |||
| timestamps_.AddSample(v); | |||
| // Push new row of sample | |||
| out_buffer_count_table_.AddSample(out_buffer_count_row); | |||
| throughput_.AddSample(throughput_row); | |||
| return Status::OK(); | |||
| } | |||
| json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr) { | |||
| auto children = node.Children(); | |||
| std::vector<int32_t> children_id; | |||
| std::transform(children.begin(), children.end(), std::back_inserter(children_id), | |||
| [](std::shared_ptr<DatasetOp> op) -> int32_t { return op->id(); }); | |||
| json json_node; | |||
| json_node["op_id"] = node.id(); | |||
| json_node["op_type"] = node.Name(); | |||
| json_node["num_workers"] = node.num_workers(); | |||
| json metrics; | |||
| metrics["output_queue"] = {{"throughput", thr}}; | |||
| json_node["metrics"] = metrics; | |||
| if (!children_id.empty()) { | |||
| json_node["children"] = children_id; | |||
| } | |||
| return json_node; | |||
| } | |||
| // Save profiling data to file | |||
| Status ConnectorThroughput::SaveToFile() { | |||
| std::ofstream os(file_path_); | |||
| json output; | |||
| output["sampling_interval"] = 10; | |||
| // Traverse the ExecutionTree for JSON node generation | |||
| int col = 0; | |||
| for (auto &node : *tree_) { | |||
| std::vector<double> throughput; | |||
| for (auto i = 0; i < throughput_.size(); i++) { | |||
| throughput.push_back(throughput_[col][i]); | |||
| } | |||
| json json_node = ParseOpInfo(node, throughput); | |||
| output["op_info"].push_back(json_node); | |||
| col++; | |||
| } | |||
| os << output; | |||
| return Status::OK(); | |||
| } | |||
| Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) { | |||
| file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString(); | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,100 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_CONNECTOR_THROUGHPUT_H | |||
| #define DATASET_CONNECTOR_THROUGHPUT_H | |||
| #include <vector> | |||
| #include <chrono> | |||
| #include <fstream> | |||
| #include <string> | |||
| #include <nlohmann/json.hpp> | |||
| #include "dataset/engine/perf/profiling.h" | |||
| #include "dataset/engine/perf/perf_data.h" | |||
| #include "dataset/engine/perf/cyclic_array.h" | |||
| #include "dataset/engine/datasetops/dataset_op.h" | |||
| using json = nlohmann::json; | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class ExecutionTree; | |||
| // Connector throughput samples the output connector size of each op in the pipeline. | |||
| // For the description of the data structure see perf_buffer.h | |||
| // It support JSON serialization for external usage. | |||
| class ConnectorThroughput : public Sampling { | |||
| using OutBufferCount = PerfData<CyclicArray<int64_t>>; | |||
| using Throughput = PerfData<CyclicArray<double>>; | |||
| using TimePoint = std::chrono::time_point<std::chrono::steady_clock>; | |||
| using TimeStamps = PerfData<CyclicArray<TimePoint>>; | |||
| public: | |||
| explicit ConnectorThroughput(ExecutionTree *tree, int64_t max_rows = 1000000) | |||
| : tree_(tree), | |||
| max_rows_(max_rows), | |||
| n_nodes_(InitNodes()), | |||
| out_buffer_count_table_(OutBufferCount(max_rows_, n_nodes_)), | |||
| throughput_(Throughput(max_rows_, n_nodes_)), | |||
| timestamps_(TimeStamps(max_rows_, 1)) { | |||
| timestamps_.AddSample(std::vector<TimePoint>(1)); | |||
| out_buffer_count_table_.AddSample(std::vector<int64_t>(n_nodes_)); | |||
| } | |||
| // Driver function for connector size sampling. | |||
| // This function samples the connector size of every nodes within the ExecutionTree | |||
| Status Sample() override; | |||
| /* Status TestPrint() override { | |||
| std::ofstream os("performance_monitor.txt"); | |||
| if (throughput_.size() == 0) { | |||
| os << "data is empty" << std::endl; | |||
| return Status::OK(); | |||
| } | |||
| for (int i = 0; i < throughput_.size(); i++) { | |||
| for (int j = 0; j < n_nodes_; j++) { | |||
| os << throughput_[j][i] << " "; | |||
| } | |||
| os << std::endl; | |||
| } | |||
| return Status::OK(); | |||
| };*/ | |||
| // Traverse the tree nodes and count them | |||
| int InitNodes(); | |||
| std::string Name() const override { return name_; }; | |||
| // Save sampling data to file | |||
| // @return Status - The error code return | |||
| Status SaveToFile() override; | |||
| Status Init(const std::string &dir_path, const std::string &device_id); | |||
| json ParseOpInfo(const DatasetOp &node, const std::vector<double> &thr); | |||
| private: | |||
| ExecutionTree *tree_ = nullptr; // ExecutionTree pointer | |||
| int64_t max_rows_; | |||
| int32_t n_nodes_; | |||
| OutBufferCount out_buffer_count_table_; | |||
| Throughput throughput_; | |||
| TimeStamps timestamps_; | |||
| std::string name_ = kConnectorThroughputSamplingName; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_CONNECTOR_THROUGHPUT_H | |||
| @@ -0,0 +1,197 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_CYCLIC_ARRAY_H | |||
| #define DATASET_CYCLIC_ARRAY_H | |||
| #include <memory> | |||
| #include <algorithm> | |||
| #include <cstring> | |||
| #include <type_traits> | |||
| #include "dataset/core/constants.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| /// \class CyclicArray "include/cyclic_array.h | |||
| /// \brief This is a container with a contiguous memory layout that pnly keeps N last entries, | |||
| /// when the number of entries exceeds the capacity | |||
| /// Must be preallocated | |||
| template <typename T> | |||
| class CyclicArray { | |||
| public: | |||
| using value_type = T; | |||
| class Iterator { | |||
| // Add operator[] and make fully compliant with random access iterator | |||
| // and add a const iterator | |||
| // add resize(), empty() | |||
| public: | |||
| using iterator_category = std::random_access_iterator_tag; | |||
| using value_type = CyclicArray::value_type; | |||
| using difference_type = std::ptrdiff_t; | |||
| using pointer = CyclicArray::value_type *; | |||
| using reference = CyclicArray::value_type &; | |||
| Iterator() = default; | |||
| Iterator(dsize_t idx, pointer ptr, dsize_t capacity, dsize_t head) | |||
| : cur_idx_(idx), ptr_(ptr), capacity_(capacity), head_(head) {} | |||
| Iterator(const Iterator &rhs) = default; | |||
| ~Iterator() = default; | |||
| Iterator &operator++() { | |||
| cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1); | |||
| return *this; | |||
| } | |||
| Iterator operator++(int) { | |||
| Iterator tmp(*this); | |||
| cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1); | |||
| return tmp; | |||
| } | |||
| Iterator &operator--() { | |||
| cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1); | |||
| return *this; | |||
| } | |||
| Iterator operator--(int) { | |||
| Iterator tmp(*this); | |||
| cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1); | |||
| return tmp; | |||
| } | |||
| Iterator operator+(dsize_t x) { return Iterator((cur_idx_ + x) % (capacity_ + 1), ptr_, capacity_, head_); } | |||
| Iterator operator-(dsize_t x) { | |||
| return Iterator((cur_idx_ + (capacity_ + 1 - x)) % (capacity_ + 1), ptr_, capacity_, head_); | |||
| } | |||
| bool operator<(const Iterator &rhs) { | |||
| return (head_ + cur_idx_) % (capacity_ + 1) < (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); | |||
| } | |||
| bool operator>(const Iterator &rhs) { | |||
| return (head_ + cur_idx_) % (capacity_ + 1) > (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); | |||
| } | |||
| bool operator>=(const Iterator &rhs) { | |||
| return (head_ + cur_idx_) % (capacity_ + 1) >= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); | |||
| } | |||
| bool operator<=(const Iterator &rhs) { | |||
| return (head_ + cur_idx_) % (capacity_ + 1) <= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); | |||
| } | |||
| difference_type operator-(const Iterator &rhs) { | |||
| return (cur_idx_ - rhs.cur_idx_ + capacity_ + 1) % (capacity_ + 1); | |||
| } | |||
| reference operator*() { return ptr_[cur_idx_]; } | |||
| pointer operator->() { return &(ptr_[cur_idx_]); } | |||
| bool operator==(const Iterator &rhs) { return cur_idx_ == rhs.cur_idx_; } | |||
| bool operator!=(const Iterator &rhs) { return cur_idx_ != rhs.cur_idx_; } | |||
| private: | |||
| dsize_t cur_idx_; | |||
| pointer ptr_; | |||
| dsize_t capacity_; | |||
| dsize_t head_; | |||
| }; | |||
| /// \brief Default constructor | |||
| CyclicArray() : buf_(nullptr), head_(0), tail_(0), size_(0), capacity_(0) {} | |||
| /// \brief Constructor | |||
| /// \param[in] capacity | |||
| explicit CyclicArray(dsize_t capacity) | |||
| : buf_(std::make_unique<T[]>(capacity + 1)), head_(0), tail_(0), size_(0), capacity_(capacity) {} | |||
| CyclicArray(const CyclicArray<T> &rhs) | |||
| : buf_(std::make_unique<T[]>(rhs.capacity_ + 1)), | |||
| head_(rhs.head_), | |||
| tail_(rhs.tail_), | |||
| size_(rhs.size_), | |||
| capacity_(rhs.capacity_) { | |||
| std::copy(rhs.begin(), rhs.end(), begin()); | |||
| } | |||
| CyclicArray(CyclicArray &&rhs) = default; | |||
| ~CyclicArray() = default; | |||
| /// \brief Iterator begin() | |||
| Iterator begin() { return Iterator(head_, buf_.get(), capacity_, head_); } | |||
| /// \brief Iterator end() | |||
| Iterator end() { return Iterator(tail_, buf_.get(), capacity_, head_); } | |||
| // not really const. | |||
| Iterator begin() const { return Iterator(head_, buf_.get(), capacity_, head_); } | |||
| Iterator end() const { return Iterator(tail_, buf_.get(), capacity_, head_); } | |||
| /// \brief clear the array. Does not deallocate memory, capacity remains the same | |||
| void clear() { | |||
| head_ = 0; | |||
| tail_ = 0; | |||
| size_ = 0; | |||
| } | |||
| /// \brief returns current size | |||
| dsize_t size() { return size_; } | |||
| /// \brief returns capacity | |||
| dsize_t capacity() { return capacity_; } | |||
| /// \brief pushes a value | |||
| /// \param[in] val value | |||
| void push_back(T val) { | |||
| buf_[tail_] = val; | |||
| if (size_ >= capacity_) { | |||
| (tail_ != capacity_) ? tail_++ : tail_ = 0; | |||
| (head_ != capacity_) ? head_++ : head_ = 0; | |||
| } else { | |||
| tail_++; | |||
| size_++; | |||
| } | |||
| } | |||
| /// \brief returns const reference to an element of the array | |||
| /// \param[in] idx index of the element | |||
| /// \param[out] const T& reference to an element of the array | |||
| const T &operator[](dsize_t idx) const { return buf_[(head_ + idx) % (capacity_ + 1)]; } | |||
| /// \brief returns non-const reference to an element of the array | |||
| /// \param[in] idx index of the element | |||
| /// \param[out] T& reference to an element of the array | |||
| T &operator[](dsize_t idx) { return buf_[(head_ + idx) % (capacity_ + 1)]; } | |||
| private: | |||
| std::unique_ptr<T[]> buf_; | |||
| dsize_t head_; | |||
| dsize_t tail_; | |||
| dsize_t size_; | |||
| dsize_t capacity_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_CYCLIC_ARRAY_H | |||
| @@ -13,6 +13,7 @@ | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_DATASET_ITERATOR_TRACING_H | |||
| #define MINDSPORE_DATASET_ITERATOR_TRACING_H | |||
| @@ -28,7 +28,6 @@ Monitor::Monitor(ExecutionTree *tree) : tree_(tree) { | |||
| max_samples_ = 0; | |||
| cur_row_ = 0; | |||
| } | |||
| Status Monitor::operator()() { | |||
| // Register this thread with TaskManager to receive proper interrupt signal. | |||
| TaskManager::FindMe()->Post(); | |||
| @@ -29,6 +29,7 @@ class ExecutionTree; | |||
| class Monitor { | |||
| public: | |||
| // Monitor object constructor | |||
| explicit Monitor(ExecutionTree *tree); | |||
| Monitor() = default; | |||
| @@ -0,0 +1,88 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_PERF_DATA_H | |||
| #define DATASET_PERF_DATA_H | |||
| #include <vector> | |||
| #include "dataset/core/constants.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| // PerfData is a convenience class to record and store the data produced by Monitor | |||
| // and represents a 2D column major table with every column storing samples | |||
| // for an operator. The number of rows equals to the number of samples, | |||
| // the number of columns equals to the number of operators. | |||
| // The capacity is determined on construction and cannot be changed. | |||
| // ColumnType can be std::vector or CyclicArray. In case of the latter data can be added | |||
| // indefinitely without the risk of overflowing otherwise the capacity must not be exceeded. | |||
| // Given PerfData pd(n_rows, n_cols) an element in the column i and row j can be accessed as | |||
| // pd[i][j] | |||
| template <typename ColumnType> | |||
| class PerfData { | |||
| public: | |||
| PerfData() = default; | |||
| ~PerfData() = default; | |||
| PerfData(dsize_t max_rows, dsize_t n_cols) : counter_(0), max_rows_(max_rows), n_cols_(n_cols) { | |||
| for (auto i = 0; i < n_cols_; i++) { | |||
| data_.push_back(ColumnType(max_rows_)); | |||
| } | |||
| } | |||
| PerfData(const PerfData &rhs) = default; | |||
| PerfData(PerfData &&rhs) = default; | |||
| // Adds a row of data | |||
| // T must be any container working with range based loops | |||
| template <typename T> | |||
| void AddSample(const T &row) { | |||
| auto i = 0; | |||
| for (const auto &e : row) { | |||
| data_[i++].push_back(e); | |||
| } | |||
| counter_++; | |||
| } | |||
| // Fetches a row of data by copy | |||
| template <typename V = typename ColumnType::value_type> | |||
| auto Row(dsize_t idx) { | |||
| std::vector<V> row(n_cols_); | |||
| for (auto i = 0; i < n_cols_; i++) { | |||
| row[i] = data_[i][idx]; | |||
| } | |||
| return row; | |||
| } | |||
| // returns a column of data | |||
| ColumnType &operator[](size_t idx) { return data_[idx]; } | |||
| const ColumnType &operator[](size_t idx) const { return data_[idx]; } | |||
| dsize_t size() { return counter_ < max_rows_ ? counter_ : max_rows_; } | |||
| dsize_t capacity() { return max_rows_; } | |||
| private: | |||
| std::vector<ColumnType> data_; | |||
| dsize_t counter_; | |||
| dsize_t max_rows_; | |||
| int n_cols_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_PERF_DATA_H | |||
| @@ -14,7 +14,6 @@ | |||
| * limitations under the License. | |||
| */ | |||
| #include "dataset/engine/perf/profiling.h" | |||
| #include <sys/time.h> | |||
| #include <cstdlib> | |||
| #include <fstream> | |||
| @@ -23,6 +22,7 @@ | |||
| #include "dataset/engine/perf/monitor.h" | |||
| #include "dataset/engine/perf/device_queue_tracing.h" | |||
| #include "dataset/engine/perf/connector_size.h" | |||
| #include "dataset/engine/perf/connector_throughput.h" | |||
| #include "dataset/engine/perf/dataset_iterator_tracing.h" | |||
| #include "utils/log_adapter.h" | |||
| @@ -72,9 +72,11 @@ Status ProfilingManager::Initialize() { | |||
| std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>(); | |||
| RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing)); | |||
| std::shared_ptr<Sampling> monitor_sampling = std::make_shared<ConnectorSize>(tree_); | |||
| RETURN_IF_NOT_OK(RegisterSamplingNode(monitor_sampling)); | |||
| std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_); | |||
| RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling)); | |||
| std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_); | |||
| RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling)); | |||
| return Status::OK(); | |||
| } | |||
| @@ -140,14 +142,15 @@ Status ProfilingManager::SaveProfilingData() { | |||
| RETURN_IF_NOT_OK(node.second->SaveToFile()); | |||
| } | |||
| MS_LOG(INFO) << "Save profiling data end."; | |||
| return Status::OK(); | |||
| } | |||
| double ProfilingTime::GetCurMilliSecond() { | |||
| struct timeval tv = {0, 0}; | |||
| (void)gettimeofday(&tv, nullptr); | |||
| return tv.tv_sec * 1000 + tv.tv_usec / 1000; | |||
| int64_t ProfilingTime::GetCurMilliSecond() { | |||
| // because cpplint does not allow using namespace | |||
| using std::chrono::duration_cast; | |||
| using std::chrono::milliseconds; | |||
| using std::chrono::steady_clock; | |||
| return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -20,6 +20,7 @@ | |||
| #include <vector> | |||
| #include <unordered_map> | |||
| #include <memory> | |||
| #include <chrono> | |||
| #include "dataset/util/status.h" | |||
| namespace mindspore { | |||
| @@ -28,9 +29,10 @@ namespace dataset { | |||
| class Monitor; | |||
| class ExecutionTree; | |||
| const char kDeviceQueueTracingName[] = "Device Queue Tracing"; | |||
| const char kDatasetIteratorTracingName[] = "Dataset Iterator Tracing"; | |||
| const char kConnectorSizeSamplingName[] = "Connector Size Sampling"; | |||
| const char kDeviceQueueTracingName[] = "Device_Queue_Tracing"; | |||
| const char kDatasetIteratorTracingName[] = "Dataset_Iterator_Tracing"; | |||
| const char kConnectorSizeSamplingName[] = "Connector_Size_Sampling"; | |||
| const char kConnectorThroughputSamplingName[] = "Connector_Throughput_Sampling"; | |||
| // Profiling is a class of basic unit of profiling action | |||
| // This base class encapsulate the serialization output logic | |||
| @@ -59,6 +61,8 @@ class Sampling : public Profiling { | |||
| public: | |||
| // Sampling action function. This function will be invoked by performance monitor thread. | |||
| virtual Status Sample() = 0; | |||
| // virtual Status TestPrint() = 0; | |||
| virtual ~Sampling() = default; | |||
| }; | |||
| // Tracing is class of profiling which record samples upon request. | |||
| @@ -132,7 +136,7 @@ enum ProfilingTimeSubType { | |||
| class ProfilingTime { | |||
| public: | |||
| static double GetCurMilliSecond(); | |||
| static int64_t GetCurMilliSecond(); | |||
| }; | |||
| } // namespace dataset | |||
| @@ -79,6 +79,8 @@ SET(DE_UT_SRCS | |||
| mask_test.cc | |||
| trucate_pair_test.cc | |||
| concatenate_op_test.cc | |||
| cyclic_array_test.cc | |||
| perf_data_test.cc | |||
| ) | |||
| add_executable(de_ut_tests ${DE_UT_SRCS}) | |||
| @@ -0,0 +1,128 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <iterator> | |||
| #include <algorithm> | |||
| #include "common/common.h" | |||
| #include "common/cvop_common.h" | |||
| #include "gtest/gtest.h" | |||
| #include "securec.h" | |||
| #include "dataset/util/de_error.h" | |||
| #include "dataset/engine/perf/cyclic_array.h" | |||
| #include <chrono> | |||
| using namespace mindspore::dataset; | |||
| class MindDataTestCyclicArray : public UT::Common { | |||
| public: | |||
| MindDataTestCyclicArray() {} | |||
| }; | |||
| TEST_F(MindDataTestCyclicArray, Test1) { | |||
| CyclicArray<int> arr(5); | |||
| EXPECT_EQ(5, arr.capacity()); | |||
| EXPECT_EQ(0, arr.size()); | |||
| arr.push_back(0); | |||
| EXPECT_EQ(5, arr.capacity()); | |||
| EXPECT_EQ(1, arr.size()); | |||
| EXPECT_EQ(arr[0], 0); | |||
| arr.push_back(1); | |||
| EXPECT_EQ(arr[1], 1); | |||
| for (auto i = 2; i < 5; i++) { | |||
| arr.push_back(i); | |||
| } | |||
| EXPECT_EQ(arr.capacity(), arr.size()); | |||
| EXPECT_EQ(1, arr[1]); | |||
| EXPECT_EQ(4, arr[4]); | |||
| arr[4] = 42; | |||
| EXPECT_EQ(arr[4], 42); | |||
| auto a = arr[4]; | |||
| EXPECT_EQ(a, 42); | |||
| arr.push_back(5); | |||
| EXPECT_EQ(arr[0], 1); | |||
| EXPECT_EQ(arr[4], 5); | |||
| CyclicArray<int> arr2 = arr; | |||
| EXPECT_EQ(arr2.capacity(), arr.capacity()); | |||
| EXPECT_EQ(arr2.size(), arr.size()); | |||
| auto last = arr2.end(); | |||
| auto first = arr2.begin(); | |||
| for (auto i = 0; i < arr.size(); i++) { | |||
| EXPECT_EQ(arr2[i], arr[i]); | |||
| } | |||
| arr.clear(); | |||
| EXPECT_EQ(arr.size(), 0); | |||
| arr.push_back(42); | |||
| arr.push_back(43); | |||
| EXPECT_EQ(arr.size(), 2); | |||
| EXPECT_EQ(arr.capacity(), 5); | |||
| EXPECT_EQ(arr[0], 42); | |||
| EXPECT_EQ(arr[1], 43); | |||
| auto arr3 = arr; | |||
| EXPECT_EQ(arr3.size(), 2); | |||
| EXPECT_EQ(arr3.capacity(), 5); | |||
| EXPECT_EQ(arr.size(), 2); | |||
| EXPECT_EQ(arr.capacity(), 5); | |||
| EXPECT_EQ(arr[0], arr3[0]); | |||
| EXPECT_EQ(arr[1], arr3[1]); | |||
| arr.clear(); | |||
| arr.push_back(21); | |||
| arr.push_back(22); | |||
| EXPECT_EQ(arr[arr.size() - 1], 22); | |||
| for (auto i = 23; i < 27; i++) { | |||
| arr.push_back(i); | |||
| } | |||
| EXPECT_EQ(arr[0], 22); | |||
| EXPECT_EQ(arr[arr.size() - 1], 26); | |||
| } | |||
| TEST_F(MindDataTestCyclicArray, TestIterator) { | |||
| CyclicArray<int> arr(5); | |||
| for (auto i = 0; i < arr.capacity(); i++) { | |||
| arr.push_back(i); | |||
| } | |||
| arr.push_back(6); | |||
| arr.push_back(7); | |||
| auto i = 0; | |||
| for (auto it = arr.begin(); it != arr.end(); ++it) { | |||
| EXPECT_EQ(*it, arr[i++]); | |||
| } | |||
| std::iota(arr.begin(), arr.end(), -4); | |||
| EXPECT_EQ(arr[0], -4); | |||
| EXPECT_EQ(arr[4], 0); | |||
| const auto sz = 1000000; | |||
| CyclicArray<int> arr2(sz); | |||
| for (auto i = 0; i < sz - 1; i++) { | |||
| arr.push_back(0); | |||
| } | |||
| const auto val = -500000; | |||
| std::iota(arr2.begin(), arr2.end() + sz, val); | |||
| EXPECT_EQ(*arr2.begin(), val); | |||
| std::random_device rd; | |||
| std::mt19937 g(rd()); | |||
| std::shuffle(arr2.begin(), arr2.end(), g); | |||
| std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; }); | |||
| EXPECT_EQ(*arr2.begin(), val); | |||
| const auto new_val = -600000; | |||
| for (auto i = 0; i < 100; i++) { | |||
| arr2.push_back(new_val); | |||
| } | |||
| EXPECT_EQ(*(--arr2.end()), new_val); | |||
| std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; }); | |||
| EXPECT_EQ(*arr2.begin(), new_val); | |||
| } | |||
| @@ -0,0 +1,71 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "common/common.h" | |||
| #include "common/cvop_common.h" | |||
| #include "gtest/gtest.h" | |||
| #include "securec.h" | |||
| #include "dataset/util/de_error.h" | |||
| #include "dataset/engine/perf/cyclic_array.h" | |||
| #include "dataset/engine/perf/perf_data.h" | |||
| using namespace mindspore::dataset; | |||
| class MindDataTestPerfData : public UT::Common { | |||
| public: | |||
| MindDataTestPerfData() {} | |||
| }; | |||
| TEST_F(MindDataTestPerfData, Test1) { | |||
| PerfData<std::vector<int>> p1(2, 3); | |||
| PerfData<CyclicArray<int>> p2(2, 3); | |||
| EXPECT_EQ(p1.capacity(), p2.capacity()); | |||
| std::vector<int> row = {1, 2, 3}; | |||
| p1.AddSample(row); | |||
| p2.AddSample(row); | |||
| EXPECT_EQ(p1.size(), p2.size()); | |||
| p1.AddSample(row); | |||
| p2.AddSample(row); | |||
| EXPECT_EQ(p1.size(), p2.size()); | |||
| row = {4, 5, 6}; | |||
| p2.AddSample(row); | |||
| auto r1 = p2.Row<int>(static_cast<int64_t>(0)); | |||
| for (auto i = 0; i < 3; i++) { | |||
| EXPECT_EQ(r1[i], i + 1); | |||
| } | |||
| auto r2 = p2.Row<int>(1); | |||
| for (auto i = 0; i < 3; i++) { | |||
| EXPECT_EQ(r2[i], i + 4); | |||
| } | |||
| EXPECT_EQ(p2[0][1], 4); | |||
| EXPECT_EQ(p2[1][1], 5); | |||
| EXPECT_EQ(p2[2][1], 6); | |||
| } | |||
| TEST_F(MindDataTestPerfData, Test2) { | |||
| auto pd = PerfData<CyclicArray<int>>(1000000, 3); | |||
| auto row = {1, 2, 3}; | |||
| pd.AddSample(row); | |||
| EXPECT_EQ(pd[0][0], 1); | |||
| EXPECT_EQ(pd[1][0], 2); | |||
| EXPECT_EQ(pd[2][0], 3); | |||
| row = {4, 5, 6}; | |||
| pd.AddSample(row); | |||
| EXPECT_EQ(pd[0][0], 1); | |||
| EXPECT_EQ(pd[1][0], 2); | |||
| EXPECT_EQ(pd[2][0], 3); | |||
| } | |||
| @@ -23,7 +23,8 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"] | |||
| DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" | |||
| SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" | |||
| PIPELINE_FILE = "./pipeline_profiling_1.json" | |||
| PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json" | |||
| PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json" | |||
| DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt" | |||
| @@ -43,8 +44,10 @@ def test_profiling_simple_pipeline(): | |||
| for _ in data1: | |||
| pass | |||
| assert os.path.exists(PIPELINE_FILE) is True | |||
| os.remove(PIPELINE_FILE) | |||
| assert os.path.exists(PIPELINE_FILE_SIZE) is True | |||
| os.remove(PIPELINE_FILE_SIZE) | |||
| assert os.path.exists(PIPELINE_FILE_THR) is True | |||
| os.remove(PIPELINE_FILE_THR) | |||
| assert os.path.exists(DATASET_ITERATOR_FILE) is True | |||
| os.remove(DATASET_ITERATOR_FILE) | |||
| del os.environ['PROFILING_MODE'] | |||
| @@ -74,8 +77,10 @@ def test_profiling_complex_pipeline(): | |||
| for _ in data3: | |||
| pass | |||
| assert os.path.exists(PIPELINE_FILE) is True | |||
| os.remove(PIPELINE_FILE) | |||
| assert os.path.exists(PIPELINE_FILE_SIZE) is True | |||
| os.remove(PIPELINE_FILE_SIZE) | |||
| assert os.path.exists(PIPELINE_FILE_THR) is True | |||
| os.remove(PIPELINE_FILE_THR) | |||
| assert os.path.exists(DATASET_ITERATOR_FILE) is True | |||
| os.remove(DATASET_ITERATOR_FILE) | |||
| del os.environ['PROFILING_MODE'] | |||
| @@ -103,8 +108,10 @@ def test_profiling_sampling_iterval(): | |||
| for _ in data1: | |||
| pass | |||
| assert os.path.exists(PIPELINE_FILE) is True | |||
| os.remove(PIPELINE_FILE) | |||
| assert os.path.exists(PIPELINE_FILE_SIZE) is True | |||
| os.remove(PIPELINE_FILE_SIZE) | |||
| assert os.path.exists(PIPELINE_FILE_THR) is True | |||
| os.remove(PIPELINE_FILE_THR) | |||
| assert os.path.exists(DATASET_ITERATOR_FILE) is True | |||
| os.remove(DATASET_ITERATOR_FILE) | |||