Browse Source

!15589 mindrt log use mslog

From: @ling_qiao_min
Reviewed-by: @zhang_xue_tong,@hangangqiang
Signed-off-by: @zhang_xue_tong
pull/15589/MERGE
mindspore-ci-bot Gitee 4 years ago
parent
commit
15c21b72c8
22 changed files with 145 additions and 181 deletions
  1. +11
    -17
      mindspore/core/mindrt/include/actor/actor.h
  2. +2
    -2
      mindspore/core/mindrt/include/actor/actorapp.h
  3. +6
    -6
      mindspore/core/mindrt/include/actor/aid.h
  4. +4
    -4
      mindspore/core/mindrt/include/actor/errcode.h
  5. +11
    -35
      mindspore/core/mindrt/include/actor/log.h
  6. +1
    -1
      mindspore/core/mindrt/include/actor/naught.h
  7. +39
    -39
      mindspore/core/mindrt/include/async/async.h
  8. +1
    -1
      mindspore/core/mindrt/include/async/collect.h
  9. +9
    -10
      mindspore/core/mindrt/include/async/future.h
  10. +1
    -1
      mindspore/core/mindrt/include/async/future_base.h
  11. +8
    -8
      mindspore/core/mindrt/include/async/option.h
  12. +1
    -1
      mindspore/core/mindrt/include/mindrt.hpp
  13. +5
    -8
      mindspore/core/mindrt/src/actor/actor.cc
  14. +6
    -6
      mindspore/core/mindrt/src/actor/actormgr.cc
  15. +1
    -1
      mindspore/core/mindrt/src/actor/actormgr.h
  16. +1
    -1
      mindspore/core/mindrt/src/actor/actorthread.cc
  17. +5
    -5
      mindspore/core/mindrt/src/actor/aid.cc
  18. +1
    -1
      mindspore/core/mindrt/src/actor/iomgr.h
  19. +1
    -1
      mindspore/core/mindrt/src/async/async.cc
  20. +12
    -12
      mindspore/core/mindrt/src/async/uuid_base.cc
  21. +18
    -20
      mindspore/core/mindrt/src/mindrt.cc
  22. +1
    -1
      mindspore/lite/src/lite_mindrt.cc

+ 11
- 17
mindspore/core/mindrt/include/actor/actor.h View File

@@ -50,8 +50,8 @@ class ActorBase {
inline void PrintMsgRecord() {
uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Actor message dumps:%s",
"actor:%s,msg:%s", id.Name().c_str(), msgRecords[startPoint].c_str());
MS_LOG(DEBUG) << "Actor message dumps:"
<< "actor:" << id.Name().c_str() << " msg:" << msgRecords[startPoint].c_str();
startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE;
}
}
@@ -89,20 +89,17 @@ class ActorBase {

// KHTTPMsg handler
virtual void HandleHttp(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleHttp() is not implemented", "a=%s", id.Name().c_str());
MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleHttp() is not implemented";
}

// KLOCALMsg handler
virtual void HandleLocalMsg(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleLocalMsg() is not implemented.", "a=%s", id.Name().c_str());
MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") HandleLocalMsg() is not implemented.";
}

// The link is closed.
virtual void Exited(const AID &actor) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) Exited() is not implemented. ", "a=%s", id.Name().c_str());
MS_LOG(ERROR) << "ACTOR (" << id.Name().c_str() << ") Exited() is not implemented. ";
}

// Filter the KMSG
@@ -158,9 +155,8 @@ class ActorBase {
static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str()
<< ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
return;
}
(t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
@@ -171,9 +167,8 @@ class ActorBase {
static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&),
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
MS_LOG(ERROR) << "Drop non-tcp message: from:" << std::string(msg->from).c_str()
<< ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
return;
}
(t->*method)(msg->from, std::move(msg->name), std::move(msg->body));
@@ -184,9 +179,8 @@ class ActorBase {
static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KUDP) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-udp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
MS_LOG(ERROR) << "Drop non-udp message: from:" << std::string(msg->from).c_str()
<< ",to:" << std::string(msg->to).c_str() << ",name:" << msg->name.c_str();
return;
}
(t->*method)(msg->from, std::move(msg->name), std::move(msg->body));


+ 2
- 2
mindspore/core/mindrt/include/actor/actorapp.h View File

@@ -46,7 +46,7 @@ class AppActor : public ActorBase {
template <typename M>
int Send(const std::string &to, const std::string &msgName, std::unique_ptr<M> msg) {
std::unique_ptr<MessageLocal> localMsg(new (std::nothrow) MessageLocal(GetAID(), to, msgName, msg.release()));
BUS_OOM_EXIT(localMsg);
MINDRT_OOM_EXIT(localMsg);
return Send(to, std::move(localMsg));
}

@@ -58,7 +58,7 @@ class AppActor : public ActorBase {
if (appBehaviors.find(msgName) != appBehaviors.end()) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "ACTOR msgName conflict:%s",
"a=%s,msg=%s", GetAID().Name().c_str(), msgName.c_str());
BUS_EXIT("msgName conflicts.");
MINDRT_EXIT("msgName conflicts.");
return;
}



+ 6
- 6
mindspore/core/mindrt/include/actor/aid.h View File

@@ -19,12 +19,12 @@

#include <string>

#include "actor/buslog.h"
#include "actor/log.h"

namespace mindspore {

constexpr auto BUS_TCP = "tcp";
constexpr auto BUS_UDP = "udp";
constexpr auto MINDRT_TCP = "tcp";
constexpr auto MINDRT_UDP = "udp";

class AID {
public:
@@ -32,8 +32,8 @@ class AID {

~AID() {}

AID(const char *name);
AID(const std::string &name);
explicit AID(const char *name);
explicit AID(const std::string &name);

AID(const std::string &tmpName, const std::string &sUrl) : name(tmpName), url(sUrl) { SetUnfixUrl(); }

@@ -86,7 +86,7 @@ inline std::ostream &operator<<(std::ostream &os, const AID &aid) {
}

inline bool operator==(const AID &aid1, const AID &aid2) {
if (aid1.GetProtocol() == BUS_TCP && aid2.GetProtocol() == BUS_TCP) {
if (aid1.GetProtocol() == MINDRT_TCP && aid2.GetProtocol() == MINDRT_TCP) {
// NOTE : By default, http has no protocol filed, so we use 'UnfixUrl' to compare aids here
return ((aid1.Name() == aid2.Name()) && (aid1.UnfixUrl() == aid2.UnfixUrl()));
} else {


mindspore/core/mindrt/include/actor/buserrcode.h → mindspore/core/mindrt/include/actor/errcode.h View File

@@ -14,12 +14,12 @@
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_BUSERRCODE_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_BUSERRCODE_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ERRCODE_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_ERRCODE_H_

// common err code -1 ~ -100
constexpr int BUS_ERROR = -1;
constexpr int BUS_OK = 0;
constexpr int MINDRT_ERROR = -1;
constexpr int MINDRT_OK = 0;
constexpr int COMM_NULL_PTR = -1;
constexpr int ERRORCODE_SUCCESS = 1;


mindspore/core/mindrt/include/actor/buslog.h → mindspore/core/mindrt/include/actor/log.h View File

@@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_BUSLOG_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_BUSLOG_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_LOG_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_ACTOR_LOG_H_

#include <signal.h>
#include <iostream>
@@ -23,47 +23,24 @@
#include <sstream>
#include <string>

#include "actor/buserrcode.h"
#include "actor/errcode.h"
#ifdef USE_GLOG
#include "utils/log_adapter.h"
#else
#include "common/log_adapter.h"
#endif
namespace mindspore {

#define BUS_LOG(severity) // LOG(severity)
#define BUS_DLOG(verboselevel) // VLOG(verboselevel)

#define HARES_LOG_PID int // GetLogPID();
#define PID_MINDRT_LOG

#define ICTSBASE_LOG_COMMON_CODE
#define HLOG_LEVEL_INFO
#define PID_MINDRT_LOG
#define HLOG_LEVEL_DEBUG 1
#define ICTSBASE_LOG0(logig, level, pid, format)
#define ICTSBASE_LOG1(logig, level, pid, format, para)
#define ICTSBASE_LOG2(logig, level, pid, format, para1, para2)
#define ICTSBASE_LOG3(logig, level, pid, format, para1, para2, para3)
#define ICTSBASE_LOG4(logig, level, pid, format, para1, para2, para3, para4)
#define ICTSBASE_LOG_STRING(logig, level, pid, preformat, format...)
#define FlushHLogCache()
// Kill the process for safe exiting.
inline void KillProcess(const std::string &ret) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "BUS Exit Tip: %s", "%s", ret.c_str());
MS_LOG(DEBUG) << "MINDRT Exit Tip:" << ret.c_str();
// flush the log in cache to disk before exiting.
FlushHLogCache();
}

} // namespace mindspore

constexpr int DLEVEL4 = 1000;
constexpr int DLEVEL3 = 3;
constexpr int DLEVEL2 = 2;
constexpr int DLEVEL1 = 1;
constexpr int DLEVEL0 = 0;

#define BUS_ASSERT(expression) \
#define MINDRT_ASSERT(expression) \
do { \
if (!(expression)) { \
std::stringstream ss; \
@@ -72,19 +49,18 @@ constexpr int DLEVEL0 = 0;
} \
} while (0)

#define BUS_EXIT(ret) \
#define MINDRT_EXIT(ret) \
do { \
std::stringstream ss; \
ss << (ret) << " ( file: " << __FILE__ << ", line: " << __LINE__ << " )."; \
mindspore::KillProcess(ss.str()); \
} while (0)

#define BUS_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "new failed, will exit"); \
BUS_EXIT("Exit for OOM."); \
} \
#define MINDRT_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
MINDRT_EXIT("Exit for OOM."); \
} \
}

constexpr int LOG_CHECK_EVERY_FIRSTNUM = 10;

+ 1
- 1
mindspore/core/mindrt/include/actor/naught.h View File

@@ -27,7 +27,7 @@ class ActorBase;

typedef std::shared_ptr<Naught> UniqueNaught;
typedef std::shared_ptr<Naught> SharedNaught;
typedef std::string BusString;
typedef std::string MindrtString;

// Lite , start from Naught
class Naught {


+ 39
- 39
mindspore/core/mindrt/include/async/async.h View File

@@ -22,7 +22,7 @@
#include <utility>

#include "actor/actor.h"
#include "actor/buslog.h"
#include "actor/log.h"

#include "async/apply.h"
#include "async/future.h"
@@ -45,7 +45,7 @@ struct AsyncHelper<void> {
void operator()(const AID &aid, F &&f) {
std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { f(); }));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
}
};
@@ -55,12 +55,12 @@ struct AsyncHelper<Future<R>> {
template <typename F>
Future<R> operator()(const AID &aid, F &&f) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->Associate(f()); }));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
return future;
}
@@ -71,12 +71,12 @@ struct AsyncHelper {
template <typename F>
Future<R> operator()(const AID &aid, F &&f) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->SetValue(f()); }));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
return future;
}
@@ -89,12 +89,12 @@ template <typename T>
void Async(const AID &aid, void (T::*method)()) {
std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([method](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
(t->*method)();
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
}

@@ -102,12 +102,12 @@ template <typename T, typename Arg0, typename Arg1>
void Async(const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) {
std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([method, arg](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
(t->*method)(arg);
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
}

@@ -115,12 +115,12 @@ template <typename T, typename... Args0, typename... Args1>
void Async(const AID &aid, void (T::*method)(Args0...), std::tuple<Args1...> &&tuple) {
std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([method, tuple](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
Apply(t, method, tuple);
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);
Async(aid, std::move(handler));
}

@@ -134,17 +134,17 @@ void Async(const AID &aid, void (T::*method)(Args0...), Args1 &&... args) {
template <typename R, typename T>
Future<R> Async(const AID &aid, Future<R> (T::*method)()) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->Associate((t->*method)());
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;
@@ -153,17 +153,17 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)()) {
template <typename R, typename T, typename Arg0, typename Arg1>
Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->Associate((t->*method)(arg));
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;
@@ -172,17 +172,17 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
template <typename R, typename T, typename... Args0, typename... Args1>
Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), std::tuple<Args1...> &&tuple) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->Associate(Apply(t, method, tuple));
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;
@@ -199,17 +199,17 @@ template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int
typename std::enable_if<!internal::IsFuture<R>::value, int>::type = 0, typename T>
Future<R> Async(const AID &aid, R (T::*method)()) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->SetValue((t->*method)());
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;
@@ -220,17 +220,17 @@ template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int
typename Arg1>
Future<R> Async(const AID &aid, R (T::*method)(Arg0), Arg1 &&arg) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->SetValue((t->*method)(arg));
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;
@@ -241,17 +241,17 @@ template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int
typename... Args1>
Future<R> Async(const AID &aid, R (T::*method)(Args0...), std::tuple<Args1...> &&tuple) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::unique_ptr<std::function<void(ActorBase *)>> handler(
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) {
BUS_ASSERT(actor != nullptr);
MINDRT_ASSERT(actor != nullptr);
T *t = static_cast<T *>(actor);
BUS_ASSERT(t != nullptr);
MINDRT_ASSERT(t != nullptr);
promise->SetValue(Apply(t, method, tuple));
}));
BUS_OOM_EXIT(handler);
MINDRT_OOM_EXIT(handler);

Async(aid, std::move(handler));
return future;


+ 1
- 1
mindspore/core/mindrt/include/async/collect.h View File

@@ -93,7 +93,7 @@ inline Future<std::list<T>> Collect(const std::list<Future<T>> &futures) {
}

Promise<std::list<T>> *promise = new (std::nothrow) Promise<std::list<T>>();
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
using CollectType = Collected<T>;
std::shared_ptr<CollectType> collect = std::make_shared<CollectType>(futures, promise);



+ 9
- 10
mindspore/core/mindrt/include/async/future.h View File

@@ -23,7 +23,7 @@
#include <iostream>
#include <list>
#include "actor/actor.h"
#include "actor/buslog.h"
#include "actor/log.h"
#include "async/spinlock.h"
#include "async/status.h"
#include "async/uuid_generator.h"
@@ -46,7 +46,7 @@ class Future : public FutureBase {
typedef typename FutureData<T>::AbandonedCallback AbandonedCallback;
typedef FutureData<T> Data;
Future() : data(new (std::nothrow) Data()) {
BUS_OOM_EXIT(data);
MINDRT_OOM_EXIT(data);
data->abandoned = true;
}

@@ -55,18 +55,18 @@ class Future : public FutureBase {
Future(Future<T> &&f) : data(std::move(f.data)) {}

explicit Future(const T &t) : data(new (std::nothrow) Data()) {
BUS_OOM_EXIT(data);
MINDRT_OOM_EXIT(data);
SetValue(std::move(t));
}

template <typename V>
explicit Future(const V &value) : data(new (std::nothrow) Data()) {
BUS_OOM_EXIT(data);
MINDRT_OOM_EXIT(data);
SetValue(value);
}

explicit Future(const MindrtStatus &s) : data(new (std::nothrow) Data()) {
BUS_OOM_EXIT(data);
MINDRT_OOM_EXIT(data);
SetFailed(s.GetCode());
}

@@ -87,8 +87,7 @@ class Future : public FutureBase {

const T &Get() const {
if (data->status.IsError()) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"Future::Get() but status == Error: %d", GetErrorCode());
MS_LOG(WARNING) << "Future::Get() but status == Error: " << GetErrorCode();
return data->t;
}

@@ -210,7 +209,7 @@ class Future : public FutureBase {
}

void SetFailed(int32_t errCode) const {
BUS_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK);
MINDRT_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK);

bool call = false;

@@ -249,7 +248,7 @@ class Future : public FutureBase {
template <typename R>
Future<R> Then(const std::function<Future<R>(const T &)> &f) const {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::function<void(const Future<T> &)> handler =
@@ -263,7 +262,7 @@ class Future : public FutureBase {
template <typename R>
Future<R> Then(const std::function<R(const T &)> &f) const {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
BUS_OOM_EXIT(promise);
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();

std::function<void(const Future<T> &)> handler =


+ 1
- 1
mindspore/core/mindrt/include/async/future_base.h View File

@@ -25,7 +25,7 @@
#include <list>

#include "actor/actor.h"
#include "actor/buslog.h"
#include "actor/log.h"
#include "async/spinlock.h"
#include "async/status.h"



+ 8
- 8
mindspore/core/mindrt/include/async/option.h View File

@@ -20,7 +20,7 @@
#include <type_traits>
#include <utility>

#include "actor/buslog.h"
#include "actor/log.h"

namespace mindspore {

@@ -42,13 +42,13 @@ class Option {
public:
Option() : data(), state(NONE) {}

Option(const T &t) : data(t), state(SOME) {}
explicit Option(const T &t) : data(t), state(SOME) {}

Option(T &&t) : data(std::move(t)), state(SOME) {}
explicit Option(T &&t) : data(std::move(t)), state(SOME) {}

Option(const InnerSome<T> &some) : data(some._t), state(SOME) {}
explicit Option(const InnerSome<T> &some) : data(some._t), state(SOME) {}

Option(const MindrtNone &none) : data(), state(NONE) {}
explicit Option(const MindrtNone &none) : data(), state(NONE) {}

Option(const Option<T> &that) : data(), state(that.state) {
if (that.IsSome()) {
@@ -63,17 +63,17 @@ class Option {
bool IsSome() const { return state == SOME; }

const T &Get() const & {
BUS_ASSERT(IsSome());
MINDRT_ASSERT(IsSome());
return data;
}

T &&Get() && {
BUS_ASSERT(IsSome());
MINDRT_ASSERT(IsSome());
return std::move(data);
}

const T &&Get() const && {
BUS_ASSERT(IsSome());
MINDRT_ASSERT(IsSome());
return std::move(data);
}



+ 1
- 1
mindspore/core/mindrt/include/mindrt.hpp View File

@@ -61,7 +61,7 @@ void Finalize();
void SetDelegate(const std::string &delegate);

// set log pid of the process use mindrt
void SetLogPID(HARES_LOG_PID pid);
void SetLogPID(int pid);

// get global mindrt address
const MindrtAddress &GetMindrtAddress();


+ 5
- 8
mindspore/core/mindrt/src/actor/actor.cc View File

@@ -45,7 +45,7 @@ void ActorBase::Await() {
}
void ActorBase::Terminate() {
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
BUS_OOM_EXIT(msg);
MINDRT_OOM_EXIT(msg);
(void)EnqueMessage(std::move(msg));
}

@@ -55,9 +55,6 @@ void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
ActorFunction &func = it->second;
func(msg);
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"ACTOR can not find function for message (%s)", "a=%s,m=%s", id.Name().c_str(),
msg->Name().c_str());
MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()
<< ",m=" << msg->Name().c_str();
}
@@ -126,7 +123,7 @@ int ActorBase::Send(const AID &to, std::unique_ptr<MessageBase> msg) {
int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, bool remoteLink, bool isExactNotRemote) {
std::unique_ptr<MessageBase> msg(
new (std::nothrow) MessageBase(this->id, to, std::move(name), std::move(strMsg), MessageBase::Type::KMSG));
BUS_OOM_EXIT(msg);
MINDRT_OOM_EXIT(msg);
return ActorMgr::GetActorMgrRef()->Send(to, std::move(msg), remoteLink, isExactNotRemote);
}

@@ -134,7 +131,7 @@ int ActorBase::Send(const AID &to, std::string &&name, std::string &&strMsg, boo
void ActorBase::Receive(const std::string &msgName, ActorFunction &&func) {
if (actionFunctions.find(msgName) != actionFunctions.end()) {
MS_LOG(ERROR) << "ACTOR function's name conflicts, a=" << id.Name().c_str() << ",f=" << msgName.c_str();
BUS_EXIT("function's name conflicts");
MINDRT_EXIT("function's name conflicts");
return;
}
actionFunctions.emplace(msgName, std::move(func));
@@ -201,7 +198,7 @@ uint64_t ActorBase::GetInBufSize(const AID &to) {
}

int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
const std::string udp = BUS_UDP;
const std::string udp = MINDRT_UDP;
auto io = ActorMgr::GetIOMgrRef(udp);
if (io != nullptr) {
return io->AddRuleUdp(peer, recordNum);
@@ -211,7 +208,7 @@ int ActorBase::AddRuleUdp(const std::string &peer, int recordNum) {
}

void ActorBase::DelRuleUdp(const std::string &peer, bool outputLog) {
const std::string udp = BUS_UDP;
const std::string udp = MINDRT_UDP;
auto io = ActorMgr::GetIOMgrRef(udp);
if (io != nullptr) {
io->DelRuleUdp(peer, outputLog);


+ 6
- 6
mindspore/core/mindrt/src/actor/actormgr.cc View File

@@ -87,7 +87,7 @@ void ActorMgr::TerminateAll() {
// send terminal msg to all actors.
for (auto actorIt = actorsWaiting.begin(); actorIt != actorsWaiting.end(); ++actorIt) {
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
BUS_OOM_EXIT(msg);
MINDRT_OOM_EXIT(msg);
(void)(*actorIt)->EnqueMessage(std::move(msg));
(*actorIt)->SetRunningStatus(true);
}
@@ -136,7 +136,7 @@ int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> msg, bool remoteL
if (IsLocalAddres(to)) {
auto actor = GetActor(to);
if (actor != nullptr) {
if (to.GetProtocol() == BUS_UDP && msg->GetType() == MessageBase::Type::KMSG) {
if (to.GetProtocol() == MINDRT_UDP && msg->GetType() == MessageBase::Type::KMSG) {
msg->type = MessageBase::Type::KUDP;
}
return actor->EnqueMessage(std::move(msg));
@@ -170,7 +170,7 @@ AID ActorMgr::Spawn(ActorReference &actor, bool shareThread, bool start) {
if (actors.find(actor->GetAID().Name()) != actors.end()) {
actorsMutex.unlock();
MS_LOG(ERROR) << "The actor's name conflicts,name:" << actor->GetAID().Name().c_str();
BUS_EXIT("Actor name conflicts.");
MINDRT_EXIT("Actor name conflicts.");
}

MS_LOG(DEBUG) << "ACTOR was spawned,a=" << actor->GetAID().Name().c_str();
@@ -179,12 +179,12 @@ AID ActorMgr::Spawn(ActorReference &actor, bool shareThread, bool start) {

if (shareThread) {
threadPolicy.reset(new (std::nothrow) ShardedThread(actor));
BUS_OOM_EXIT(threadPolicy);
MINDRT_OOM_EXIT(threadPolicy);
actor->Spawn(actor, std::move(threadPolicy));

} else {
threadPolicy.reset(new (std::nothrow) SingleThread());
BUS_OOM_EXIT(threadPolicy);
MINDRT_OOM_EXIT(threadPolicy);
actor->Spawn(actor, std::move(threadPolicy));
ActorMgr::GetActorMgrRef()->SetActorReady(actor);
}
@@ -204,7 +204,7 @@ void ActorMgr::Terminate(const AID &id) {
auto actor = GetActor(id);
if (actor != nullptr) {
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageBase("Terminate", MessageBase::Type::KTERMINATE));
BUS_OOM_EXIT(msg);
MINDRT_OOM_EXIT(msg);
(void)actor->EnqueMessage(std::move(msg));
actor->SetRunningStatus(true);
}


+ 1
- 1
mindspore/core/mindrt/src/actor/actormgr.h View File

@@ -40,7 +40,7 @@ class ActorMgr {

static void Receive(std::unique_ptr<MessageBase> &&msg) {
auto to = msg->To().Name();
(void)ActorMgr::GetActorMgrRef()->Send(to, std::move(msg));
(void)ActorMgr::GetActorMgrRef()->Send(AID(to), std::move(msg));
}

ActorMgr();


+ 1
- 1
mindspore/core/mindrt/src/actor/actorthread.cc View File

@@ -65,7 +65,7 @@ void ActorThread::AddThread(int threadCount) {
break;
}
std::unique_ptr<std::thread> worker(new (std::nothrow) std::thread(&ActorThread::Run, this));
BUS_OOM_EXIT(worker);
MINDRT_OOM_EXIT(worker)

workers.push_back(std::move(worker));
}


+ 5
- 5
mindspore/core/mindrt/src/actor/aid.cc View File

@@ -25,7 +25,7 @@ constexpr int PROCOLLEN = 3; // strlen("://");
void AID::SetUnfixUrl() {
size_t index = url.find("://");
if (index != std::string::npos) {
if (url.substr(0, index) == BUS_TCP) {
if (url.substr(0, index) == MINDRT_TCP) {
url = url.substr(index + PROCOLLEN);
}
}
@@ -59,9 +59,9 @@ AID::AID(const std::string &tmpName) {
bool AID::OK() const {
std::string proto = GetProtocol();
#ifdef UDP_ENABLED
bool protoOK = (proto == BUS_TCP) || (proto == BUS_UDP);
bool protoOK = (proto == MINDRT_TCP) || (proto == MINDRT_UDP);
#else
bool protoOK = (proto == BUS_TCP);
bool protoOK = (proto == MINDRT_TCP);
#endif
int port = GetPort();
bool portOK = port > PORTMINNUMBER && port < PORTMAXNUMBER;
@@ -78,14 +78,14 @@ AID &AID::operator=(const AID &id) {
void AID::SetProtocol(const std::string &protocol) {
size_t index = url.find("://");
if (index != std::string::npos) {
if (protocol == BUS_TCP) {
if (protocol == MINDRT_TCP) {
url = url.substr(index + PROCOLLEN);
} else {
url = protocol + url.substr(index);
}

} else {
if (protocol == BUS_TCP) {
if (protocol == MINDRT_TCP) {
// url = url;
} else {
url = protocol + "://" + url;


+ 1
- 1
mindspore/core/mindrt/src/actor/iomgr.h View File

@@ -42,7 +42,7 @@ static const int SOCKET_KEEPINTERVAL = 5;
// probes without getting a reply.
static const int SOCKET_KEEPCOUNT = 3;

static const char BUS_MAGICID[] = "BUS0";
static const char MINDRT_MAGICID[] = "MINDRT0";

static const char URL_PROTOCOL_IP_SEPARATOR[] = "://";



+ 1
- 1
mindspore/core/mindrt/src/async/async.cc View File

@@ -34,7 +34,7 @@ class MessageAsync : public MessageBase {

void Async(const AID &aid, std::unique_ptr<std::function<void(ActorBase *)>> handler) {
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(std::move(handler)));
BUS_OOM_EXIT(msg);
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
}



+ 12
- 12
mindspore/core/mindrt/src/async/uuid_base.cc View File

@@ -35,18 +35,18 @@ const uint8_t *uuid::EndAddress() const { return uuidData + UUID_SIZE; }
std::size_t uuid::Size() { return UUID_SIZE; }

std::string uuid::ToBytes(const uuid &u) {
BUS_ASSERT(sizeof(u) == UUID_SIZE);
MINDRT_ASSERT(sizeof(u) == UUID_SIZE);
return std::string(reinterpret_cast<const char *>(u.uuidData), sizeof(u.uuidData));
}

Option<uuid> uuid::FromBytes(const std::string &s) {
if (s.size() != UUID_SIZE) {
return MindrtNone();
return Option<uuid>(MindrtNone());
}
uuid u;
memcpy(&u.uuidData, s.data(), s.size());

return u;
return Option(u);
}

Option<unsigned char> uuid::GetValue(char c) {
@@ -57,15 +57,15 @@ Option<unsigned char> uuid::GetValue(char c) {
size_t pos = std::find(digitsBegin, digitsEnd, c) - digitsBegin;
if (pos >= digitsLen) {
MS_LOG(ERROR) << "invalid char";
return MindrtNone();
return Option<unsigned char>(MindrtNone());
}
return values[pos];
return Option<unsigned char>(values[pos]);
}

Option<uuid> uuid::FromString(const std::string &s) {
auto sBegin = s.begin();
if (sBegin == s.end()) {
return MindrtNone();
return Option<uuid>(MindrtNone());
}
auto c = *sBegin;
bool hasOpenBrace = (c == '{');
@@ -84,12 +84,12 @@ Option<uuid> uuid::FromString(const std::string &s) {
c = *(sBegin++);
} else {
MS_LOG(ERROR) << "str invalid";
return MindrtNone();
return Option<uuid>(MindrtNone());
}
}
Option<unsigned char> oc1 = GetValue(c);
if (oc1.IsNone()) {
return MindrtNone();
return Option<uuid>(MindrtNone());
}
u.uuidData[i] = oc1.Get();
if (sBegin != s.end()) {
@@ -98,15 +98,15 @@ Option<uuid> uuid::FromString(const std::string &s) {
u.uuidData[i] <<= SHIFT_BIT;
Option<unsigned char> oc2 = GetValue(c);
if (oc2.IsNone()) {
return MindrtNone();
return Option<uuid>(MindrtNone());
}
u.uuidData[i] |= oc2.Get();
}
if ((hasOpenBrace && (c != '}')) || (sBegin != s.end())) {
MS_LOG(ERROR) << "No } end or leng invalid";
return MindrtNone();
return Option<uuid>(MindrtNone());
}
return u;
return Option(u);
}

// To check whether uuid looks like 0000000-000-000-000-000000000000000
@@ -162,7 +162,7 @@ uuid RandomBasedGenerator::GenerateRandomUuid() {
auto ret = memcpy(tmpUUID.BeginAddress() + offSet, &lCount, sizeof(lCount));
if (ret != 0) {
MS_LOG(ERROR) << "memcpy_s error.";
BUS_OOM_EXIT(tmpUUID.BeginAddress());
MINDRT_OOM_EXIT(tmpUUID.BeginAddress());
}

// set the variant


+ 18
- 20
mindspore/core/mindrt/src/mindrt.cc View File

@@ -53,7 +53,7 @@ static std::atomic_bool g_finalizeMindrtStatus(false);
} // namespace local

const MindrtAddress &GetMindrtAddress() {
BUS_OOM_EXIT(local::g_mindrtAddress);
MINDRT_OOM_EXIT(local::g_mindrtAddress);
return *local::g_mindrtAddress;
}

@@ -61,39 +61,37 @@ void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(th

class MindrtExit {
public:
MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter MindrtExit()---------");
}
MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()---------"; }
~MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter ~MindrtExit()---------");
MS_LOG(DEBUG) << "trace: enter ~MindrtExit()---------";
mindspore::Finalize();
}
};

int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
const std::string &udpUrlAdv, int threadCount) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts ......");
MS_LOG(DEBUG) << "mindrt starts ......";

// start actor's thread
SetThreadCount(threadCount);

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has started.");
return BUS_OK;
MS_LOG(DEBUG) << "mindrt has started.";
return MINDRT_OK;
}

int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
const std::string &udpUrlAdv, int threadCount) {
/* support repeat initialize */
int result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount);
static MindrtExit busExit;
static MindrtExit mindrtExit;

return result;
}

AID Spawn(ActorReference actor, bool sharedThread, bool start) {
if (actor == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Actor is nullptr.");
BUS_EXIT("Actor is nullptr.");
MS_LOG(DEBUG) << "Actor is nullptr.";
MINDRT_EXIT("Actor is nullptr.");
}

if (local::g_finalizeMindrtStatus.load() == true) {
@@ -118,30 +116,30 @@ void TerminateAll() { mindspore::ActorMgr::GetActorMgrRef()->TerminateAll(); }
void Finalize() {
bool inite = false;
if (local::g_finalizeMindrtStatus.compare_exchange_strong(inite, true) == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been Finalized.");
MS_LOG(DEBUG) << "mindrt has been Finalized.";
return;
}

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts to finalize.");
MS_LOG(DEBUG) << "mindrt starts to finalize.";
mindspore::ActorMgr::GetActorMgrRef()->Finalize();

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been finalized.");
MS_LOG(DEBUG) << "mindrt has been finalized.";
// flush the log in cache to disk before exiting.
FlushHLogCache();
}

void SetDelegate(const std::string &delegate) { mindspore::ActorMgr::GetActorMgrRef()->SetDelegate(delegate); }

static HARES_LOG_PID g_busLogPid = 1;
void SetLogPID(HARES_LOG_PID pid) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, pid, "Set Mindrt log PID: %u", pid);
g_busLogPid = pid;
static int g_mindrtLogPid = 1;
void SetLogPID(int pid) {
MS_LOG(DEBUG) << "Set Mindrt log PID:" << pid;
g_mindrtLogPid = pid;
}
HARES_LOG_PID GetLogPID() { return g_busLogPid; }
int GetLogPID() { return g_mindrtLogPid; }

static int g_httpKmsgEnable = -1;
void SetHttpKmsgFlag(int flag) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Set Mindrt http message format:%d", flag);
MS_LOG(DEBUG) << "Set Mindrt http message format:" << flag;
g_httpKmsgEnable = flag;
}



+ 1
- 1
mindspore/lite/src/lite_mindrt.cc View File

@@ -35,7 +35,7 @@ int LiteOpActor::CompileArrow() {
continue;
}
auto id = out->name() + this->GetAID().Url();
auto arrow = std::make_shared<OpArrow>(i, id, to_input_index);
auto arrow = std::make_shared<OpArrow>(i, AID(id), to_input_index);
if (arrow == nullptr) {
MS_LOG(ERROR) << "create OpArrow failed, out kernel: " << out->name();
return RET_ERROR;


Loading…
Cancel
Save