From bcb30821d576be3cd9bf2ed08b9eb20b6daa5e6f Mon Sep 17 00:00:00 2001 From: zhangyinxia Date: Tue, 26 Jan 2021 18:17:28 +0800 Subject: [PATCH] add message --- .../{master/grpc => common}/grpc_client.cc | 5 +- .../{master/grpc => common}/grpc_client.h | 5 +- mindspore_serving/ccsrc/master/dispacther.h | 2 +- .../ccsrc/master/notify_worker/base_notify.h | 3 +- .../ccsrc/master/notify_worker/grpc_notify.cc | 1 - .../agent_process/agent_process.cc | 36 ++++++++ .../agent_process/agent_process.h | 41 +++++++++ .../distributed_process.cc | 37 ++++++++ .../distributed_process/distributed_process.h | 49 ++++++++++ .../notify_agent/base_notify_agent.h | 43 +++++++++ .../notify_agent/notify_agent.cc | 39 ++++++++ .../notify_agent/notify_agent.h | 48 ++++++++++ .../notify_distributed/base_notify_worker.h | 38 ++++++++ .../notify_distributed/notify_worker.cc | 91 +++++++++++++++++++ .../notify_distributed/notify_worker.h | 53 +++++++++++ mindspore_serving/proto/ms_agent.proto | 27 ++++++ mindspore_serving/proto/ms_distributed.proto | 28 ++++++ 17 files changed, 537 insertions(+), 9 deletions(-) rename mindspore_serving/ccsrc/{master/grpc => common}/grpc_client.cc (91%) rename mindspore_serving/ccsrc/{master/grpc => common}/grpc_client.h (91%) create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.cc create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.h create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.h create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/base_notify_agent.h create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.cc create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.h create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/base_notify_worker.h create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc create mode 100644 mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h create mode 100644 mindspore_serving/proto/ms_agent.proto create mode 100644 mindspore_serving/proto/ms_distributed.proto diff --git a/mindspore_serving/ccsrc/master/grpc/grpc_client.cc b/mindspore_serving/ccsrc/common/grpc_client.cc similarity index 91% rename from mindspore_serving/ccsrc/master/grpc/grpc_client.cc rename to mindspore_serving/ccsrc/common/grpc_client.cc index 85ad129..508da4e 100644 --- a/mindspore_serving/ccsrc/master/grpc/grpc_client.cc +++ b/mindspore_serving/ccsrc/common/grpc_client.cc @@ -1,5 +1,5 @@ /** - * Copyright 2020 Huawei Technologies Co., Ltd + * Copyright 2021 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,10 +14,9 @@ * limitations under the License. */ -#include "master/grpc/grpc_client.h" +#include "common/grpc_client.h" #include #include -#include "master/grpc/grpc_server.h" namespace mindspore { namespace serving { diff --git a/mindspore_serving/ccsrc/master/grpc/grpc_client.h b/mindspore_serving/ccsrc/common/grpc_client.h similarity index 91% rename from mindspore_serving/ccsrc/master/grpc/grpc_client.h rename to mindspore_serving/ccsrc/common/grpc_client.h index ca39a48..afd9a1e 100644 --- a/mindspore_serving/ccsrc/master/grpc/grpc_client.h +++ b/mindspore_serving/ccsrc/common/grpc_client.h @@ -1,5 +1,5 @@ /** - * Copyright 2020 Huawei Technologies Co., Ltd + * Copyright 2021 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ #include #include #include "common/serving_common.h" -#include "master/notify_worker/base_notify.h" #include "proto/ms_service.pb.h" #include "proto/ms_service.grpc.pb.h" #include "proto/ms_master.pb.h" @@ -38,6 +37,8 @@ extern std::unique_ptr client_; using PredictOnFinish = std::function; +using DispatchCallback = std::function; + class MSServiceClient { public: MSServiceClient() = default; diff --git a/mindspore_serving/ccsrc/master/dispacther.h b/mindspore_serving/ccsrc/master/dispacther.h index 95f632b..4e5e406 100644 --- a/mindspore_serving/ccsrc/master/dispacther.h +++ b/mindspore_serving/ccsrc/master/dispacther.h @@ -27,7 +27,7 @@ #include "common/instance.h" #include "common/servable.h" #include "master/notify_worker/base_notify.h" -#include "master/grpc/grpc_client.h" +#include "common/grpc_client.h" namespace mindspore::serving { diff --git a/mindspore_serving/ccsrc/master/notify_worker/base_notify.h b/mindspore_serving/ccsrc/master/notify_worker/base_notify.h index 5d8cb11..5ccb0c3 100644 --- a/mindspore_serving/ccsrc/master/notify_worker/base_notify.h +++ b/mindspore_serving/ccsrc/master/notify_worker/base_notify.h @@ -22,12 +22,11 @@ #include "common/serving_common.h" #include "common/servable.h" #include "proto/ms_service.pb.h" +#include "common/grpc_client.h" namespace mindspore { namespace serving { -using DispatchCallback = std::function; - class MS_API BaseNotifyWorker { public: BaseNotifyWorker() = default; diff --git a/mindspore_serving/ccsrc/master/notify_worker/grpc_notify.cc b/mindspore_serving/ccsrc/master/notify_worker/grpc_notify.cc index 4420d44..86ca8e7 100644 --- a/mindspore_serving/ccsrc/master/notify_worker/grpc_notify.cc +++ b/mindspore_serving/ccsrc/master/notify_worker/grpc_notify.cc @@ -20,7 +20,6 @@ #include #include "common/exit_handle.h" #include "common/grpc_server.h" -#include "master/grpc/grpc_client.h" namespace mindspore { namespace serving { diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.cc b/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.cc new file mode 100644 index 0000000..474fe3a --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.cc @@ -0,0 +1,36 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "worker/distributed_worker/agent_process/agent_process.h" + +namespace mindspore { +namespace serving { +grpc::Status MSAgentImpl::Exit(grpc::ServerContext *context, const proto::ExitRequest *request, + proto::ExitReply *reply) { + MSI_LOG(INFO) << "Distributed Worker Exit"; + // to do : need WorkerAgent support stop funcition + return grpc::Status::OK; +} + +grpc::Status MSAgentImpl::Predict(grpc::ServerContext *context, const proto::PredictRequest *request, + proto::PredictReply *reply) { + MSI_LOG(INFO) << "Begin call service Eval"; + // to do : need WorkerAgent support run funcition + return grpc::Status::OK; +} + +} // namespace serving +} // namespace mindspore diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.h b/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.h new file mode 100644 index 0000000..b04affe --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/agent_process/agent_process.h @@ -0,0 +1,41 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_WORKER_AGENT_PROCESS_H +#define MINDSPORE_SERVING_WORKER_AGENT_PROCESS_H + +#include +#include +#include +#include "common/serving_common.h" +#include "proto/ms_worker.pb.h" +#include "proto/ms_worker.grpc.pb.h" + +namespace mindspore { +namespace serving { + +// Service Implement +class MSAgentImpl final : public proto::MSWorker::Service { + public: + grpc::Status Predict(grpc::ServerContext *context, const proto::PredictRequest *request, + proto::PredictReply *reply) override; + grpc::Status Exit(grpc::ServerContext *context, const proto::ExitRequest *request, proto::ExitReply *reply) override; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_WORKER_AGENT_PROCESS_H diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc new file mode 100644 index 0000000..11b86d3 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc @@ -0,0 +1,37 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "worker/distributed_worker/distributed_process/distributed_process.h" + +namespace mindspore { +namespace serving { + +grpc::Status MSDistributedImpl::Register(grpc::ServerContext *context, const proto::RegisterRequest *request, + proto::RegisterReply *reply) { + return grpc::Status::OK; +} + +grpc::Status MSDistributedImpl::Predict(grpc::ServerContext *context, const proto::PredictRequest *request, + proto::PredictReply *reply) { + return grpc::Status::OK; +} + +grpc::Status MSDistributedImpl::Exit(grpc::ServerContext *context, const proto::ExitRequest *request, + proto::ExitReply *reply) { + return grpc::Status::OK; +} +} // namespace serving +} // namespace mindspore diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.h b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.h new file mode 100644 index 0000000..0ba4f08 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.h @@ -0,0 +1,49 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_DISTRIBUTED_WORKER_WORKER_PROCESS_H +#define MINDSPORE_SERVING_DISTRIBUTED_WORKER_WORKER_PROCESS_H + +#include +#include +#include +#include "common/serving_common.h" +#include "proto/ms_worker.pb.h" +#include "proto/ms_worker.grpc.pb.h" +#include "proto/ms_service.pb.h" +#include "proto/ms_service.grpc.pb.h" +#include "proto/ms_master.pb.h" +#include "proto/ms_master.grpc.pb.h" + +namespace mindspore { +namespace serving { + +// Service Implement +class MSDistributedImpl final : public proto::MSMaster::Service, public proto::MSWorker::Service { + public: + MSDistributedImpl() {} + ~MSDistributedImpl() = default; + grpc::Status Register(grpc::ServerContext *context, const proto::RegisterRequest *request, + proto::RegisterReply *reply) override; + grpc::Status Predict(grpc::ServerContext *context, const proto::PredictRequest *request, + proto::PredictReply *reply) override; + grpc::Status Exit(grpc::ServerContext *context, const proto::ExitRequest *request, proto::ExitReply *reply) override; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_DISTRIBUTED_WORKER_WORKER_PROCESS_H diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/base_notify_agent.h b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/base_notify_agent.h new file mode 100644 index 0000000..861ea0d --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/base_notify_agent.h @@ -0,0 +1,43 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_WORKER_BASE_NOTIFY_AGENT_H +#define MINDSPORE_SERVING_WORKER_BASE_NOTIFY_AGENT_H +#include +#include +#include +#include "common/serving_common.h" +#include "common/servable.h" +#include "proto/ms_service.pb.h" + +namespace mindspore { +namespace serving { + +using DistributeCallback = std::function; + +class MS_API BaseNotifyAgent { + public: + BaseNotifyAgent() = default; + virtual ~BaseNotifyAgent() = default; + virtual Status Exit() = 0; + virtual Status DispatchAsync(const proto::PredictRequest &request, proto::PredictReply *reply, + DistributeCallback callback) = 0; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_WORKER_BASE_NOTIFY_AGENT_H diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.cc b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.cc new file mode 100644 index 0000000..cbc3e50 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.cc @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "worker/distributed_worker/notify_agent/notify_agent.h" +#include +#include +#include +#include +#include "common/exit_handle.h" +#include "common/grpc_server.h" + +namespace mindspore { +namespace serving { + +GrpcNotfiyAgent::GrpcNotfiyAgent(const std::string &worker_address) {} + +GrpcNotfiyAgent::~GrpcNotfiyAgent() = default; + +Status GrpcNotfiyAgent::Exit() { return SUCCESS; } + +Status GrpcNotfiyAgent::DispatchAsync(const proto::PredictRequest &request, proto::PredictReply *reply, + DistributeCallback callback) { + return SUCCESS; +} + +} // namespace serving +} // namespace mindspore diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.h b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.h new file mode 100644 index 0000000..974c54a --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_agent/notify_agent.h @@ -0,0 +1,48 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_WORKER_NOTIFY_AGENT_H +#define MINDSPORE_SERVING_WORKER_NOTIFY_AGENT_H +#include +#include +#include +#include +#include "worker/distributed_worker/notify_agent/base_notify_agent.h" +#include "proto/ms_agent.pb.h" +#include "proto/ms_agent.grpc.pb.h" + +namespace mindspore { +namespace serving { + +class MS_API GrpcNotfiyAgent : public BaseNotifyAgent { + public: + explicit GrpcNotfiyAgent(const std::string &worker_address); + ~GrpcNotfiyAgent() override; + + Status Exit() override; + + Status DispatchAsync(const proto::PredictRequest &request, proto::PredictReply *reply, + DistributeCallback callback) override; + + private: + std::string worker_address_; + std::shared_ptr stub_ = nullptr; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_WORKER_NOTIFY_AGENT_H diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/base_notify_worker.h b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/base_notify_worker.h new file mode 100644 index 0000000..8e5e690 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/base_notify_worker.h @@ -0,0 +1,38 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_WORKER_BASE_NOTIFY_WORKER_H +#define MINDSPORE_SERVING_WORKER_BASE_NOTIFY_WORKER_H +#include +#include "common/serving_common.h" +#include "common/servable.h" +#include "worker/distributed_worker/common.h" + +namespace mindspore { +namespace serving { + +class MS_API BaseNotifyDistributeWorker { + public: + BaseNotifyDistributeWorker() = default; + virtual ~BaseNotifyDistributeWorker() = default; + virtual Status Register(const std::vector &worker_specs) = 0; + virtual Status Unregister() = 0; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_WORKER_BASE_NOTIFY_WORKER_H diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc new file mode 100644 index 0000000..d0014d5 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc @@ -0,0 +1,91 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "worker/distributed_worker/notify_distributed/notify_worker.h" +#include +#include +#include +#include +#include "common/exit_handle.h" +#include "common/grpc_server.h" + +namespace mindspore { +namespace serving { + +GrpcNotfiyDistributeWorker::GrpcNotfiyDistributeWorker(const std::string &distributed_worker_ip, + uint32_t distributed_worker_port, const std::string &host_ip, + uint32_t host_port) + : distributed_worker_ip_(distributed_worker_ip), + distributed_worker_port_(distributed_worker_port), + host_ip_(host_ip), + host_port_(host_port) { + distributed_worker_address_ = distributed_worker_ip + ":" + std::to_string(distributed_worker_port); + agent_address_ = host_ip_ + ":" + std::to_string(host_port_); + auto channel = GrpcServer::CreateChannel(distributed_worker_address_); + stub_ = proto::MSDistributedWorker::NewStub(channel); +} + +GrpcNotfiyDistributeWorker::~GrpcNotfiyDistributeWorker() = default; + +Status GrpcNotfiyDistributeWorker::Register(const std::vector &worker_specs) { + const int32_t REGISTER_TIME_OUT = 60; + const int32_t REGISTER_INTERVAL = 1; + auto loop = REGISTER_TIME_OUT; + while (loop-- && !ExitSignalHandle::Instance().HasStopped()) { + MSI_LOG(INFO) << "Register to " << distributed_worker_address_; + proto::RegisterRequest request; + request.set_address(agent_address_); + // to do set RegisterRequest message + proto::RegisterReply reply; + grpc::ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(REGISTER_INTERVAL); + context.set_deadline(deadline); + grpc::Status status = stub_->Register(&context, request, &reply); + if (status.ok()) { + MSI_LOG(INFO) << "Register SUCCESS "; + return SUCCESS; + } + MSI_LOG_INFO << "Grpc message: " << status.error_code() << ", " << status.error_message(); + std::this_thread::sleep_for(std::chrono::milliseconds(REGISTER_INTERVAL * 1000)); + } + if (ExitSignalHandle::Instance().HasStopped()) { + return INFER_STATUS_LOG_WARNING(FAILED) << "Worker exit, stop registration"; + } + return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Register TimeOut"; +} + +Status GrpcNotfiyDistributeWorker::Unregister() { + if (is_stoped_.load()) { + return SUCCESS; + } + is_stoped_ = true; + proto::ExitRequest request; + request.set_address(agent_address_); + proto::ExitReply reply; + grpc::ClientContext context; + const int32_t TIME_OUT = 1; + std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(TIME_OUT); + context.set_deadline(deadline); + grpc::Status status = stub_->Exit(&context, request, &reply); + if (status.ok()) { + MSI_LOG(INFO) << "Exit SUCCESS "; + return SUCCESS; + } + return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Exit Failed"; +} + +} // namespace serving +} // namespace mindspore diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h new file mode 100644 index 0000000..e698c56 --- /dev/null +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h @@ -0,0 +1,53 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_WORKER_NOTIFY_WORKER_H +#define MINDSPORE_SERVING_WORKER_NOTIFY_WORKER_H +#include +#include +#include +#include "worker/distributed_worker/notify_distributed/base_notify_worker.h" +#include "proto/ms_master.pb.h" +#include "proto/ms_master.grpc.pb.h" +#include "proto/ms_distributed.pb.h" +#include "proto/ms_distributed.grpc.pb.h" +namespace mindspore { +namespace serving { + +class MS_API GrpcNotfiyDistributeWorker : public BaseNotifyDistributeWorker { + public: + GrpcNotfiyDistributeWorker(const std::string &master_ip, uint32_t master_port, const std::string &host_ip, + uint32_t host_port); + ~GrpcNotfiyDistributeWorker() override; + Status Register(const std::vector &worker_specs) override; + Status Unregister() override; + + private: + std::string distributed_worker_ip_; + uint32_t distributed_worker_port_; + std::string host_ip_; + uint32_t host_port_; + std::string agent_address_; + std::string distributed_worker_address_; + + std::unique_ptr stub_; + std::atomic is_stoped_{false}; +}; + +} // namespace serving +} // namespace mindspore + +#endif // MINDSPORE_SERVING_WORKER_NOTIFY_WORKER_H diff --git a/mindspore_serving/proto/ms_agent.proto b/mindspore_serving/proto/ms_agent.proto new file mode 100644 index 0000000..eb44638 --- /dev/null +++ b/mindspore_serving/proto/ms_agent.proto @@ -0,0 +1,27 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ms_manager.proto +syntax = "proto3"; + +package mindspore.serving.proto; +import "mindspore_serving/proto/ms_service.proto"; +import "mindspore_serving/proto/ms_master.proto"; + +service MSAgent { + rpc Predict(PredictRequest) returns (PredictReply) {} + rpc Exit(ExitRequest) returns (ExitReply) {} +} diff --git a/mindspore_serving/proto/ms_distributed.proto b/mindspore_serving/proto/ms_distributed.proto new file mode 100644 index 0000000..a53f126 --- /dev/null +++ b/mindspore_serving/proto/ms_distributed.proto @@ -0,0 +1,28 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ms_manager.proto +syntax = "proto3"; + +package mindspore.serving.proto; +import "mindspore_serving/proto/ms_service.proto"; +import "mindspore_serving/proto/ms_master.proto"; + +service MSDistributedWorker { + rpc Predict(PredictRequest) returns (PredictReply) {} + rpc Exit(ExitRequest) returns (ExitReply) {} + rpc Register(RegisterRequest) returns (RegisterReply) {} +} \ No newline at end of file