浏览代码

litebus to mindrt

pull/15558/head
ling 4 年前
父节点
当前提交
a36b5a0a7a
共有 17 个文件被更改,包括 89 次插入243 次删除
  1. +0
    -2
      mindspore/core/mindrt/CMakeLists.txt
  2. +13
    -13
      mindspore/core/mindrt/include/actor/actor.h
  3. +2
    -2
      mindspore/core/mindrt/include/actor/actorapp.h
  4. +9
    -10
      mindspore/core/mindrt/include/actor/buslog.h
  5. +1
    -1
      mindspore/core/mindrt/include/actor/msg.h
  6. +1
    -3
      mindspore/core/mindrt/include/async/collect.h
  7. +8
    -11
      mindspore/core/mindrt/include/async/future.h
  8. +10
    -10
      mindspore/core/mindrt/include/mindrt.h
  9. +10
    -10
      mindspore/core/mindrt/include/mindrt.hpp
  10. +0
    -131
      mindspore/core/mindrt/src/CMakeLists.txt
  11. +0
    -8
      mindspore/core/mindrt/src/actor/CMakeLists.txt
  12. +3
    -3
      mindspore/core/mindrt/src/actor/actor.cc
  13. +3
    -3
      mindspore/core/mindrt/src/actor/actormgr.cc
  14. +0
    -7
      mindspore/core/mindrt/src/async/CMakeLists.txt
  15. +27
    -27
      mindspore/core/mindrt/src/mindrt.cc
  16. +1
    -1
      mindspore/lite/src/lite_mindrt.cc
  17. +1
    -1
      mindspore/lite/test/CMakeLists.txt

+ 0
- 2
mindspore/core/mindrt/CMakeLists.txt 查看文件

@@ -11,5 +11,3 @@ file(GLOB MINDRT_SRC
)

add_library(mindrt_mid OBJECT ${MINDRT_SRC})



+ 13
- 13
mindspore/core/mindrt/include/actor/actor.h 查看文件

@@ -50,7 +50,7 @@ class ActorBase {
inline void PrintMsgRecord() {
uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Actor message dumps:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Actor message dumps:%s",
"actor:%s,msg:%s", id.Name().c_str(), msgRecords[startPoint].c_str());
startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE;
}
@@ -79,7 +79,7 @@ class ActorBase {
void DelRuleUdp(const std::string &peer, bool outputLog);

protected:
using ActorFunction = std::function<void(std::unique_ptr<MessageBase> &msg)>;
using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;

// install KMSG handler . This method will be called before the actor start to run.
virtual void Init() {}
@@ -89,19 +89,19 @@ class ActorBase {

// KHTTPMsg handler
virtual void HandleHttp(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleHttp() is not implemented", "a=%s", id.Name().c_str());
}

// KLOCALMsg handler
virtual void HandleLocalMsg(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleLocalMsg() is not implemented.", "a=%s", id.Name().c_str());
}

// The link is closed.
virtual void Exited(const AID &actor) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) Exited() is not implemented. ", "a=%s", id.Name().c_str());
}

@@ -152,13 +152,13 @@ class ActorBase {
friend class ActorThread;

// KMSG Msg Handler
virtual void HandlekMsg(std::unique_ptr<MessageBase> &msg);
virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);

template <typename T>
static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-tcp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@@ -169,9 +169,9 @@ class ActorBase {
// register the message handle. It will be discarded.
template <typename T>
static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-tcp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@@ -182,9 +182,9 @@ class ActorBase {
// register the udp message handle. Use this closure function to drop non-udp messages
template <typename T>
static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KUDP) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-udp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-udp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@@ -196,7 +196,7 @@ class ActorBase {
void Quit();
int EnqueMessage(std::unique_ptr<MessageBase> msg);

void Spawn(std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> actorThread);
void Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> actorThread);
void SetRunningStatus(bool start);

std::unique_ptr<ActorPolicy> actorThread;


+ 2
- 2
mindspore/core/mindrt/include/actor/actorapp.h 查看文件

@@ -56,7 +56,7 @@ class AppActor : public ActorBase {
APPBehavior behavior = std::bind(&BehaviorBase<T, M>, static_cast<T *>(this), method, std::placeholders::_1);

if (appBehaviors.find(msgName) != appBehaviors.end()) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ACTOR msgName conflict:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "ACTOR msgName conflict:%s",
"a=%s,msg=%s", GetAID().Name().c_str(), msgName.c_str());
BUS_EXIT("msgName conflicts.");
return;
@@ -80,7 +80,7 @@ class AppActor : public ActorBase {
if (it != appBehaviors.end()) {
it->second(std::move(msg));
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ACTOR can not finds handler:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "ACTOR can not finds handler:%s",
"a=%s,msg=%s,hdlno=%zd", GetAID().Name().c_str(), msg->Name().c_str(), appBehaviors.size());
}
}


+ 9
- 10
mindspore/core/mindrt/include/actor/buslog.h 查看文件

@@ -35,11 +35,11 @@ namespace mindspore {
#define BUS_DLOG(verboselevel) // VLOG(verboselevel)

#define HARES_LOG_PID int // GetLogPID();
#define PID_LITEBUS_LOG
#define PID_MINDRT_LOG

#define ICTSBASE_LOG_COMMON_CODE
#define HLOG_LEVEL_INFO
#define PID_LITEBUS_LOG
#define PID_MINDRT_LOG
#define HLOG_LEVEL_DEBUG 1
#define ICTSBASE_LOG0(logig, level, pid, format)
#define ICTSBASE_LOG1(logig, level, pid, format, para)
@@ -50,8 +50,7 @@ namespace mindspore {
#define FlushHLogCache()
// Kill the process for safe exiting.
inline void KillProcess(const std::string &ret) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "BUS Exit Tip: %s", "%s",
ret.c_str());
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "BUS Exit Tip: %s", "%s", ret.c_str());
// flush the log in cache to disk before exiting.
FlushHLogCache();
}
@@ -80,12 +79,12 @@ constexpr int DLEVEL0 = 0;
mindspore::KillProcess(ss.str()); \
} while (0)

#define BUS_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "new failed, will exit"); \
BUS_EXIT("Exit for OOM."); \
} \
#define BUS_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "new failed, will exit"); \
BUS_EXIT("Exit for OOM."); \
} \
}

constexpr int LOG_CHECK_EVERY_FIRSTNUM = 10;


+ 1
- 1
mindspore/core/mindrt/include/actor/msg.h 查看文件

@@ -83,4 +83,4 @@ class MessageBase {

} // namespace mindspore

#endif // __LITEBUS_MESSAGE_HPP__
#endif

+ 1
- 3
mindspore/core/mindrt/include/async/collect.h 查看文件

@@ -26,10 +26,8 @@
#include "async/future.h"
#include "async/defer.h"
#include "async/spinlock.h"

#include "actor/actor.h"

#include "litebus.hpp"
#include "mindrt/include/mindrt.hpp"

namespace mindspore {



+ 8
- 11
mindspore/core/mindrt/include/async/future.h 查看文件

@@ -14,24 +14,21 @@
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_

#include <memory>
#include <utility>
#include <future>
#include <iostream>
#include <list>

#include "actor/actor.h"
#include "actor/buslog.h"

#include "async/spinlock.h"
#include "async/status.h"
#include "async/uuid_generator.h"

#include "litebus.hpp"

#include "async/future_base.h"
#include "mindrt/include/mindrt.hpp"

namespace mindspore {

@@ -90,7 +87,7 @@ class Future : public FutureBase {

const T &Get() const {
if (data->status.IsError()) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_LITEBUS_LOG,
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"Future::Get() but status == Error: %d", GetErrorCode());
return data->t;
}
@@ -103,15 +100,15 @@ class Future : public FutureBase {
data->t = data->future.get();
data->gotten = true;
// } catch (std::future_error const &e) {
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Future error: %s",
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Future error: %s",
// "%s",
// e.what());
// } catch (std::exception const &e) {
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Standard exception:
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Standard exception:
// %s",
// "%s", e.what());
// } catch (...) {
// ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Unknown exception.");
// ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Unknown exception.");
// }

return data->t;


mindspore/core/mindrt/include/litebus.h → mindspore/core/mindrt/include/mindrt.h 查看文件

@@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_H_

#ifdef __cplusplus
#include <string>
@@ -23,20 +23,20 @@
extern "C" {
#endif

#define LITEBUS_URL_MAX_LEN 138
#define MINDRT_URL_MAX_LEN 138

struct LitebusConfig {
char tcpUrl[LITEBUS_URL_MAX_LEN];
char tcpUrlAdv[LITEBUS_URL_MAX_LEN];
char udpUrl[LITEBUS_URL_MAX_LEN];
char udpUrlAdv[LITEBUS_URL_MAX_LEN];
struct MindrtConfig {
char tcpUrl[MINDRT_URL_MAX_LEN];
char tcpUrlAdv[MINDRT_URL_MAX_LEN];
char udpUrl[MINDRT_URL_MAX_LEN];
char udpUrlAdv[MINDRT_URL_MAX_LEN];
int threadCount;
int httpKmsgFlag;
};

int LitebusInitializeC(const struct LitebusConfig *config);
int MindrtInitializeC(const struct MindrtConfig *config);

void LitebusFinalizeC();
void MindrtFinalizeC();

#ifdef __cplusplus
}

mindspore/core/mindrt/include/litebus.hpp → mindspore/core/mindrt/include/mindrt.hpp 查看文件

@@ -14,8 +14,8 @@
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_HPP_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_HPP_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_HPP_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_HPP_H_

#include <string>
#include "mindrt/include/actor/actor.h"
@@ -23,10 +23,10 @@
// brief provide an asynchronous programming framework as Actor model
namespace mindspore {

struct LitebusAddress {
std::string scheme;
std::string ip;
uint16_t port;
struct MindrtAddress {
std::string scheme;
std::string ip;
uint16_t port;
};

// brief initialize the library
@@ -60,11 +60,11 @@ void Finalize();
// brief set the delegate of restful
void SetDelegate(const std::string &delegate);

// set log pid of the process use litebus
// set log pid of the process use mindrt
void SetLogPID(HARES_LOG_PID pid);

// get global litebus address
const LitebusAddress &GetLitebusAddress();
// get global mindrt address
const MindrtAddress &GetMindrtAddress();

// get flag of http message format
int GetHttpKmsgFlag();
@@ -72,5 +72,5 @@ int GetHttpKmsgFlag();
// brief set flag of http message format
void SetHttpKmsgFlag(int flag);

} // namespace mindspore
} // namespace mindspore
#endif

+ 0
- 131
mindspore/core/mindrt/src/CMakeLists.txt 查看文件

@@ -1,131 +0,0 @@
# include DIRECTIVES FOR LITEBUS LIBRARY (generates, e.g., -I/path/to/thing on Linux).
######################################################################################
if(${HTTP_ENABLED} STREQUAL "on")
set(LITEBUS_INCLUDE_DIR ${LITEBUS_INCLUDE_DIR} ${HTTP_PARSER_INCLUDE_DIR})
endif()

if(${SSL_ENABLED} STREQUAL "on")
set(LITEBUS_INCLUDE_DIR ${LITEBUS_INCLUDE_DIR}
${LITEBUS_OSSCRYPTO_INCLUDE_DIR}
${LITEBUS_HARESCRYPTO_INCLUDE_DIR}
)
endif()

link_directories(${SECUREC_LIB_DIR})
link_directories(${GLOG_LIB_DIR})
link_directories(${HARES_LOG_LIB_DIR})
link_directories(${PROTOBUF_C_LIB_DIR})

if(${HTTP_ENABLED} STREQUAL "on")
link_directories(${HTTP_PARSER_LIB_DIR})
endif()

# LINK libgcov.a
#######################################################################
if(${CODE_COVERAGE} STREQUAL "on")
set(LINK_LIBS ${LINK_LIBS} gcov)
endif()

# add object lib to avoid duplicate compile for a single source file
#######################################################################
add_library(litebus_obj OBJECT litebus.cc)
target_include_directories(litebus_obj PUBLIC ${LITEBUS_INCLUDE_DIR} ${3RDPARTY_LITEBUS_INCLUDE_DIRS})
#add_library(decrypt_obj OBJECT)

# THE LITEBUS LIBRARY (generates, e.g., liblitebus.so, etc., on Linux).
#######################################################################
if(${STATIC_LIB} STREQUAL "on")

#######################################################################
add_library(${LITEBUS_TARGET}_static STATIC $<TARGET_OBJECTS:litebus_obj>)
target_link_libraries(${LITEBUS_TARGET}_static ${LINK_LIBS} ${LITEBUS_HARES_DECRYPT_SLIB}
${OPENSSL_SSL_LIB_A} ${OPENSSL_CRYPTO_LIB_A})
set_target_properties(${LITEBUS_TARGET}_static PROPERTIES OUTPUT_NAME ${LITEBUS_TARGET})

if(DEFINED DEPEND_PATH)
add_custom_command(TARGET ${LITEBUS_TARGET}_static POST_BUILD
COMMAND ${CMAKE_COMMAND} -E create_symlink lib${LITEBUS_TARGET}.a liblitebus.a
)
endif()

#######################################################################
endif()

set(LINK_LIBS ${LINK_LIBS})

if(${HTTP_ENABLED} STREQUAL "on")
set(LINK_LIBS ${LINK_LIBS} ${HTTP_PARSER_DFLAG})
endif()

add_library(litebus_shared SHARED $<TARGET_OBJECTS:litebus_obj>)
target_link_libraries(litebus_shared ${LINK_LIBS})
target_include_directories(litebus_shared PUBLIC ${LITEBUS_INCLUDE_DIR} ${3RDPARTY_LITEBUS_INCLUDE_DIRS})

set_target_properties(
litebus_shared PROPERTIES
OUTPUT_NAME litebus
VERSION ${LITEBUS_PACKAGE_VERSION}
SOVERSION ${LITEBUS_PACKAGE_VERSION}
LINK_FLAGS -s
)

#copy lib to depend path (internal use)
#set(DEPEND_PATH "${PROJECT_SOURCE_DIR}/output1")
if(DEFINED DEPEND_PATH)
set(DEPEND_LIB_PATH ${DEPEND_PATH}/LITEBUS/lib)
set(DEPEND_INCLUDE_PATH ${DEPEND_PATH}/LITEBUS/include)

add_custom_target(litebus_all ALL COMMENT "================= litebus_all =====================")
if(${STATIC_LIB} STREQUAL "on")
add_dependencies(litebus_all litebus_shared ${LITEBUS_TARGET}_static)
endif()
add_dependencies(litebus_all litebus_shared)
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -P ${PROJECT_SOURCE_DIR}/cmake/MakeDirectory.cmake
${DEPEND_LIB_PATH} ${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -DLITEBUS_COPYTO="${DEPEND_LIB_PATH}" -P
${PROJECT_SOURCE_DIR}/cmake/CopyLibToPath.cmake
COMMAND ${CMAKE_COMMAND} -E copy ${PROJECT_SOURCE_DIR}/include/litebus.hpp
${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${PROJECT_SOURCE_DIR}/include/litebus.h
${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/actor
${DEPEND_INCLUDE_PATH}/actor
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/async
${DEPEND_INCLUDE_PATH}/async
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/timer
${DEPEND_INCLUDE_PATH}/timer
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/exec
${DEPEND_INCLUDE_PATH}/exec
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/utils
${DEPEND_INCLUDE_PATH}/utils)
if(${HTTP_ENABLED} STREQUAL "on")
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${PROJECT_SOURCE_DIR}/include/httpd ${DEPEND_INCLUDE_PATH}/httpd)
endif()
if(${SSL_ENABLED} STREQUAL "on")
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${PROJECT_SOURCE_DIR}/include/ssl ${DEPEND_INCLUDE_PATH}/ssl)
endif()

endif()

#install lib to package path
if("${PROJECT_HARES}" STREQUAL "cloudcore")
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_LS_Master/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_LS_Slave/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_HASEN_Common/lib PERMISSIONS OWNER_READ)
elseif("${PROJECT_HARES}" STREQUAL "hasen_cloudcore_csp")
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Common/lib PERMISSIONS OWNER_READ)
else()
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Common/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Slave/lib PERMISSIONS OWNER_READ)
endif()

# Build pbjson.so.
add_subdirectory(actor)
add_subdirectory(async)
add_subdirectory(evloop)
add_subdirectory(timer)

+ 0
- 8
mindspore/core/mindrt/src/actor/CMakeLists.txt 查看文件

@@ -1,8 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/actor.cc
${CMAKE_CURRENT_SOURCE_DIR}/actormgr.cc
${CMAKE_CURRENT_SOURCE_DIR}/actorthread.cc
${CMAKE_CURRENT_SOURCE_DIR}/actorpolicy.cc
${CMAKE_CURRENT_SOURCE_DIR}/aid.cc
${CMAKE_CURRENT_SOURCE_DIR}/sysmgr_actor.cc
)

+ 3
- 3
mindspore/core/mindrt/src/actor/actor.cc 查看文件

@@ -26,7 +26,7 @@ ActorBase::ActorBase(const std::string &name)

ActorBase::~ActorBase() {}

void ActorBase::Spawn(std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> thread) {
void ActorBase::Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> thread) {
// lock here or await(). and unlock at Quit() or at aweit.
waiterLock.lock();

@@ -49,13 +49,13 @@ void ActorBase::Terminate() {
(void)EnqueMessage(std::move(msg));
}

void ActorBase::HandlekMsg(std::unique_ptr<MessageBase> &msg) {
void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
auto it = actionFunctions.find(msg->Name());
if (it != actionFunctions.end()) {
ActorFunction &func = it->second;
func(msg);
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"ACTOR can not find function for message (%s)", "a=%s,m=%s", id.Name().c_str(),
msg->Name().c_str());
MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()


+ 3
- 3
mindspore/core/mindrt/src/actor/actormgr.cc 查看文件

@@ -102,11 +102,11 @@ void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount);

void ActorMgr::Finalize() {
this->TerminateAll();
MS_LOG(INFO) << "litebus Actors finish exiting.";
MS_LOG(INFO) << "mindrt Actors finish exiting.";

// stop all actor threads;
threadPool.Finalize();
MS_LOG(INFO) << "litebus Threads finish exiting.";
MS_LOG(INFO) << "mindrt Threads finish exiting.";

// stop iomgr thread
for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
@@ -114,7 +114,7 @@ void ActorMgr::Finalize() {
mgrIt->second->Finish();
}

MS_LOG(INFO) << "litebus IOMGRS finish exiting.";
MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
}

ActorReference ActorMgr::GetActor(const AID &id) {


+ 0
- 7
mindspore/core/mindrt/src/async/CMakeLists.txt 查看文件

@@ -1,7 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/async.cc
${CMAKE_CURRENT_SOURCE_DIR}/future.cc
${CMAKE_CURRENT_SOURCE_DIR}/uuid_base.cc
${CMAKE_CURRENT_SOURCE_DIR}/uuid_generator.cc
# ${CMAKE_CURRENT_SOURCE_DIR}/flag_parser_impl.cpp
)

mindspore/core/mindrt/src/litebus.cc → mindspore/core/mindrt/src/mindrt.cc 查看文件

@@ -16,13 +16,13 @@

#include <cstdlib>
#include <atomic>
#include "mindrt/src/actor/actormgr.h"
#include "mindrt/src/actor/iomgr.h"
#include "litebus.hpp"
#include "include/litebus.h"
#include "src/actor/actormgr.h"
#include "src/actor/iomgr.h"
#include "include/mindrt.hpp"
#include "include/mindrt.h"

extern "C" {
int LitebusInitializeC(const struct LitebusConfig *config) {
int MindrtInitializeC(const struct MindrtConfig *config) {
if (config == nullptr) {
return -1;
}
@@ -40,44 +40,44 @@ int LitebusInitializeC(const struct LitebusConfig *config) {
std::string(config->udpUrlAdv), config->threadCount);
}

void LitebusFinalizeC() { mindspore::Finalize(); }
void MindrtFinalizeC() { mindspore::Finalize(); }
}

namespace mindspore {

namespace local {

static LitebusAddress *g_litebusAddress = new (std::nothrow) LitebusAddress();
static std::atomic_bool g_finalizeLitebusStatus(false);
static MindrtAddress *g_mindrtAddress = new (std::nothrow) MindrtAddress();
static std::atomic_bool g_finalizeMindrtStatus(false);

} // namespace local

const LitebusAddress &GetLitebusAddress() {
BUS_OOM_EXIT(local::g_litebusAddress);
return *local::g_litebusAddress;
const MindrtAddress &GetMindrtAddress() {
BUS_OOM_EXIT(local::g_mindrtAddress);
return *local::g_mindrtAddress;
}

void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); }

class LiteBusExit {
class MindrtExit {
public:
LiteBusExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "trace: enter LiteBusExit()---------");
MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter MindrtExit()---------");
}
~LiteBusExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "trace: enter ~LiteBusExit()---------");
~MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter ~MindrtExit()---------");
mindspore::Finalize();
}
};

int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
const std::string &udpUrlAdv, int threadCount) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts ......");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts ......");

// start actor's thread
SetThreadCount(threadCount);

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has started.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has started.");
return BUS_OK;
}

@@ -85,18 +85,18 @@ int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const st
const std::string &udpUrlAdv, int threadCount) {
/* support repeat initialize */
int result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount);
static LiteBusExit busExit;
static MindrtExit busExit;

return result;
}

AID Spawn(ActorReference actor, bool sharedThread, bool start) {
if (actor == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Actor is nullptr.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Actor is nullptr.");
BUS_EXIT("Actor is nullptr.");
}

if (local::g_finalizeLitebusStatus.load() == true) {
if (local::g_finalizeMindrtStatus.load() == true) {
return actor->GetAID();
} else {
return ActorMgr::GetActorMgrRef()->Spawn(actor, sharedThread, start);
@@ -117,15 +117,15 @@ void TerminateAll() { mindspore::ActorMgr::GetActorMgrRef()->TerminateAll(); }

void Finalize() {
bool inite = false;
if (local::g_finalizeLitebusStatus.compare_exchange_strong(inite, true) == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been Finalized.");
if (local::g_finalizeMindrtStatus.compare_exchange_strong(inite, true) == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been Finalized.");
return;
}

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts to finalize.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts to finalize.");
mindspore::ActorMgr::GetActorMgrRef()->Finalize();

ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been finalized.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been finalized.");
// flush the log in cache to disk before exiting.
FlushHLogCache();
}
@@ -134,14 +134,14 @@ void SetDelegate(const std::string &delegate) { mindspore::ActorMgr::GetActorMgr

static HARES_LOG_PID g_busLogPid = 1;
void SetLogPID(HARES_LOG_PID pid) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, pid, "Set LiteBus log PID: %u", pid);
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, pid, "Set Mindrt log PID: %u", pid);
g_busLogPid = pid;
}
HARES_LOG_PID GetLogPID() { return g_busLogPid; }

static int g_httpKmsgEnable = -1;
void SetHttpKmsgFlag(int flag) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Set LiteBus http message format:%d", flag);
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Set Mindrt http message format:%d", flag);
g_httpKmsgEnable = flag;
}


+ 1
- 1
mindspore/lite/src/lite_mindrt.cc 查看文件

@@ -16,7 +16,7 @@

#include <utility>
#include "src/lite_mindrt.h"
#include "mindrt/include/litebus.hpp"
#include "mindrt/include/mindrt.hpp"

namespace mindspore::lite {
int LiteOpActor::CompileArrow() {


+ 1
- 1
mindspore/lite/test/CMakeLists.txt 查看文件

@@ -194,7 +194,7 @@ if(ENABLE_MINDRT)
set(TEST_LITE_SRC ${TEST_LITE_SRC}
${LITE_DIR}/src/lite_mindrt.cc
${LITE_DIR}/src/mindrt_executor.cc
${CORE_DIR}/mindrt/src/litebus.cc
${CORE_DIR}/mindrt/src/mindrt.cc
${CORE_DIR}/mindrt/src/actor/actor.cc
${CORE_DIR}/mindrt/src/actor/actormgr.cc
${CORE_DIR}/mindrt/src/actor/actorpolicy.cc


正在加载...
取消
保存