Browse Source

implement lock-free queue

tags/v1.3.0
zhaizhiqiang 4 years ago
parent
commit
722fc9fd5b
5 changed files with 179 additions and 7 deletions
  1. +1
    -1
      mindspore/core/mindrt/include/actor/actor.h
  2. +6
    -6
      mindspore/core/mindrt/src/actor/actor.cc
  3. +79
    -0
      mindspore/core/mindrt/src/thread/hqueue.h
  4. +1
    -0
      mindspore/lite/test/CMakeLists.txt
  5. +92
    -0
      mindspore/lite/test/ut/src/lite_mindrt_test.cc

+ 1
- 1
mindspore/core/mindrt/include/actor/actor.h View File

@@ -196,7 +196,7 @@ class ActorBase {
void Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> actorThread);
void SetRunningStatus(bool start);

std::unique_ptr<ActorPolicy> actorThread;
std::unique_ptr<ActorPolicy> actorPolicy;
InterThreadPool *pool_{nullptr};

AID id;


+ 6
- 6
mindspore/core/mindrt/src/actor/actor.cc View File

@@ -22,7 +22,7 @@
namespace mindspore {

ActorBase::ActorBase(const std::string &name)
: actorThread(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
: actorPolicy(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}

ActorBase::~ActorBase() {}

@@ -30,9 +30,9 @@ void ActorBase::Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<A
// lock here or await(). and unlock at Quit() or at aweit.
waiterLock.lock();

actorThread = std::move(thread);
actorPolicy = std::move(thread);
}
void ActorBase::SetRunningStatus(bool start) { actorThread->SetRunningStatus(start); }
void ActorBase::SetRunningStatus(bool start) { actorPolicy->SetRunningStatus(start); }

void ActorBase::Await() {
std::string actorName = id.Name();
@@ -59,18 +59,18 @@ void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
<< ",m=" << msg->Name().c_str();
}
}
int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> &&msg) { return actorThread->EnqueMessage(std::move(msg)); }
int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> &&msg) { return actorPolicy->EnqueMessage(std::move(msg)); }

void ActorBase::Quit() {
Finalize();
actorThread->Terminate(this);
actorPolicy->Terminate(this);
// lock at spawn(), unlock here.
waiterLock.unlock();
}

void ActorBase::Run() {
for (;;) {
auto msgs = actorThread->GetMsgs();
auto msgs = actorPolicy->GetMsgs();
if (msgs == nullptr) {
return;
}


+ 79
- 0
mindspore/core/mindrt/src/thread/hqueue.h View File

@@ -0,0 +1,79 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_
#include <atomic>
#include <vector>

namespace mindspore {
// implement a lock-free queue
template <class T>
class HQueue {
public:
HQueue(const HQueue &) = delete;
HQueue &operator=(const HQueue &) = delete;
explicit HQueue(size_t queue_size) : freeHead(0), usedHead(0) { cache.resize(queue_size); }
virtual ~HQueue() {
freeHead = 0;
usedHead = 0;
}

bool Enqueue(T *t) {
size_t curPos = freeHead.load(std::memory_order_relaxed);
size_t nextPos = curPos + 1;
if (nextPos == cache.size()) {
nextPos = 0;
}

size_t usedIndex = usedHead.load(std::memory_order_acquire);
if (nextPos != usedIndex) {
cache[curPos] = t;
// move free head to new position
freeHead.store(nextPos, std::memory_order_release);
return true;
}

// cache is full
return false;
}

T *Dequeue() {
size_t usedIndex = usedHead.load(std::memory_order_relaxed);
size_t freeIndex = freeHead.load(std::memory_order_acquire);

if (freeIndex == usedHead) { // empty
return nullptr;
}

T *ret = cache[usedIndex];
usedIndex++;
if (usedIndex == cache.size()) {
usedIndex = 0;
}
usedHead.store(usedIndex, std::memory_order_release);
return ret;
}

private:
std::vector<T *> cache;
std::atomic<size_t> freeHead;
std::atomic<size_t> usedHead;
};

} // namespace mindspore

#endif // MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_

+ 1
- 0
mindspore/lite/test/CMakeLists.txt View File

@@ -291,6 +291,7 @@ set(TEST_SRC
${TEST_DIR}/ut/src/utils_test.cc
${TEST_DIR}/ut/src/dynamic_library_loader_test.cc
${TEST_DIR}/ut/src/scheduler_test.cc
${TEST_DIR}/ut/src/lite_mindrt_test.cc
)

if(ENABLE_CONVERTER)


+ 92
- 0
mindspore/lite/test/ut/src/lite_mindrt_test.cc View File

@@ -0,0 +1,92 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "actor/actor.h"
#include "actor/op_actor.h"
#include "async/uuid_base.h"
#include "async/future.h"
#include "src/lite_mindrt.h"
#include "thread/hqueue.h"
#include "common/common_test.h"

namespace mindspore {
class LiteMindRtTest : public mindspore::CommonTest {
public:
LiteMindRtTest() {}
};

TEST_F(LiteMindRtTest, HQueueTest) {
HQueue<int> hq(10000);
std::vector<int *> v1(2000);
int d1 = 1;
for (size_t s = 0; s < v1.size(); s++) {
v1[s] = new int(d1);
}
std::vector<int *> v2(2000);
int d2 = 2;
for (size_t s = 0; s < v2.size(); s++) {
v2[s] = new int(d2);
}

std::thread t1([&]() {
for (size_t s = 0; s < v1.size(); s++) {
ASSERT_EQ(hq.Enqueue(v1[s]), true);
}
});
std::thread t2([&]() {
for (size_t s = 0; s < v2.size(); s++) {
ASSERT_EQ(hq.Enqueue(v2[s]), true);
}
});

int c1 = 0;
int c2 = 0;

std::thread t3([&]() {
size_t loop = v1.size() + v2.size();
while (loop) {
int *val = hq.Dequeue();
if (val == nullptr) {
continue;
}
loop--;
if (*val == d1) {
c1++;
} else if (*val == d2) {
c2++;
} else {
// should never come here
ASSERT_EQ(0, 1);
}
}
});

t1.join();
t2.join();
t3.join();

ASSERT_EQ(c1, v1.size());
ASSERT_EQ(c2, v2.size());
ASSERT_EQ(hq.Dequeue(), nullptr);

for (size_t s = 0; s < v1.size(); s++) {
delete v1[s];
}

for (size_t s = 0; s < v2.size(); s++) {
delete v2[s];
}
}
} // namespace mindspore

Loading…
Cancel
Save