|
|
|
@@ -23,7 +23,7 @@ |
|
|
|
|
|
|
|
#include "actor/actor.h" |
|
|
|
#include "actor/log.h" |
|
|
|
|
|
|
|
#include "actor/actormgr.h" |
|
|
|
#include "async/apply.h" |
|
|
|
#include "async/future.h" |
|
|
|
|
|
|
|
@@ -31,7 +31,15 @@ namespace mindspore { |
|
|
|
|
|
|
|
using MessageHandler = std::function<void(ActorBase *)>; |
|
|
|
|
|
|
|
void Async(const AID &aid, std::unique_ptr<MessageHandler> &&handler); |
|
|
|
class MessageAsync : public MessageBase { |
|
|
|
public: |
|
|
|
explicit MessageAsync(MessageHandler &h) : MessageBase("Async", Type::KASYNC), handler(h) {} |
|
|
|
~MessageAsync() override {} |
|
|
|
void Run(ActorBase *actor) override { (handler)(actor); } |
|
|
|
|
|
|
|
private: |
|
|
|
MessageHandler handler; |
|
|
|
}; |
|
|
|
|
|
|
|
namespace internal { |
|
|
|
|
|
|
|
@@ -43,10 +51,10 @@ template <> |
|
|
|
struct AsyncHelper<void> { |
|
|
|
template <typename F> |
|
|
|
void operator()(const AID &aid, F &&f) { |
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { f(); })); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [=](ActorBase *) { f(); }; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
@@ -58,10 +66,11 @@ struct AsyncHelper<Future<R>> { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->Associate(f()); })); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
MessageHandler handler = [=](ActorBase *) { promise->Associate(f()); }; |
|
|
|
|
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
}; |
|
|
|
@@ -74,10 +83,10 @@ struct AsyncHelper { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->SetValue(f()); })); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [=](ActorBase *) { promise->SetValue(f()); }; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
}; |
|
|
|
@@ -87,41 +96,41 @@ struct AsyncHelper { |
|
|
|
// return void |
|
|
|
template <typename T> |
|
|
|
void Async(const AID &aid, void (T::*method)()) { |
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
(t->*method)(); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
(t->*method)(); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
} |
|
|
|
|
|
|
|
template <typename T, typename Arg0, typename Arg1> |
|
|
|
void Async(const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) { |
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
(t->*method)(arg); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
(t->*method)(arg); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
} |
|
|
|
|
|
|
|
template <typename T, typename... Args0, typename... Args1> |
|
|
|
void Async(const AID &aid, void (T::*method)(Args0...), std::tuple<Args1...> &&tuple) { |
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
Apply(t, method, tuple); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
Apply(t, method, tuple); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
} |
|
|
|
|
|
|
|
template <typename T, typename... Args0, typename... Args1> |
|
|
|
@@ -137,16 +146,15 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)()) { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate((t->*method)()); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate((t->*method)()); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -156,16 +164,16 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate((t->*method)(arg)); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate((t->*method)(arg)); |
|
|
|
}; |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -175,16 +183,16 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), std::tuple<Arg |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate(Apply(t, method, tuple)); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->Associate(Apply(t, method, tuple)); |
|
|
|
}; |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -202,16 +210,16 @@ Future<R> Async(const AID &aid, R (T::*method)()) { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue((t->*method)()); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue((t->*method)()); |
|
|
|
}; |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -223,16 +231,15 @@ Future<R> Async(const AID &aid, R (T::*method)(Arg0), Arg1 &&arg) { |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue((t->*method)(arg)); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method, arg](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue((t->*method)(arg)); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -244,16 +251,15 @@ Future<R> Async(const AID &aid, R (T::*method)(Args0...), std::tuple<Args1...> & |
|
|
|
MINDRT_OOM_EXIT(promise); |
|
|
|
Future<R> future = promise->GetFuture(); |
|
|
|
|
|
|
|
std::unique_ptr<std::function<void(ActorBase *)>> handler( |
|
|
|
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue(Apply(t, method, tuple)); |
|
|
|
})); |
|
|
|
MINDRT_OOM_EXIT(handler); |
|
|
|
|
|
|
|
Async(aid, std::move(handler)); |
|
|
|
std::function<void(ActorBase *)> handler = [promise, method, tuple](ActorBase *actor) { |
|
|
|
MINDRT_ASSERT(actor != nullptr); |
|
|
|
T *t = static_cast<T *>(actor); |
|
|
|
MINDRT_ASSERT(t != nullptr); |
|
|
|
promise->SetValue(Apply(t, method, tuple)); |
|
|
|
}; |
|
|
|
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler)); |
|
|
|
MINDRT_OOM_EXIT(msg); |
|
|
|
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg)); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
|