Browse Source

fix: sometimes hung when hostPush is not return

tags/v1.1.0
jonyguo 5 years ago
parent
commit
31d52aa20e
1 changed files with 27 additions and 0 deletions
  1. +27
    -0
      mindspore/ccsrc/minddata/dataset/util/task.cc

+ 27
- 0
mindspore/ccsrc/minddata/dataset/util/task.cc View File

@@ -20,6 +20,10 @@
#if defined(__ANDROID__) || defined(ANDROID) #if defined(__ANDROID__) || defined(ANDROID)
#include "minddata/dataset/util/services.h" #include "minddata/dataset/util/services.h"
#endif #endif
#ifdef ENABLE_TDTQUE
#include "tdt/tdt_host_interface.h"
#include "tdt/status.h"
#endif


namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
@@ -135,6 +139,7 @@ Status Task::Join(WaitFlag blocking) {
// interrupt and becomes blocked on a conditional variable forever. As a result, calling // interrupt and becomes blocked on a conditional variable forever. As a result, calling
// join() will not come back. We need some timeout version of join such that if the thread // join() will not come back. We need some timeout version of join such that if the thread
// doesn't come back in a reasonable of time, we will send the interrupt again. // doesn't come back in a reasonable of time, we will send the interrupt again.
uint32_t wait_times = 0;
while (thrd_.wait_for(std::chrono::seconds(1)) != std::future_status::ready) { while (thrd_.wait_for(std::chrono::seconds(1)) != std::future_status::ready) {
// We can't tell which conditional_variable this thread is waiting on. So we may need // We can't tell which conditional_variable this thread is waiting on. So we may need
// to interrupt everything one more time. // to interrupt everything one more time.
@@ -142,6 +147,28 @@ Status Task::Join(WaitFlag blocking) {
ss << get_id(); ss << get_id();
MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str() << " is not responding. Interrupt again"; MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str() << " is not responding. Interrupt again";
interrupt_svc->InterruptAll(); interrupt_svc->InterruptAll();
wait_times++;
#ifdef ENABLE_TDTQUE
// Because hostPush hung in DeviceQueueOp, wait 5 seconds and destroy the tdt
if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) {
MS_LOG(WARNING) << "Wait " << wait_times << " seconds, "
<< "the task: " << my_name_ << " will be destoryed by TdtHostDestory.";
int32_t destory_status = tdt::TdtHostDestroy();
if (destory_status != TDT_OK_CODE) {
MS_LOG(WARNING) << "Destory tsd failed, status = " << destory_status << ".";
} else {
MS_LOG(INFO) << "Destory tsd success.";
}

// just wait 30 seconds
// case1: cpu usage 100%, DeviceQueueOp thread may destory without thrd_ future
if (wait_times > 30) {
MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str()
<< " is not responding. Maybe it's destoryed, task stop.";
break;
}
}
#endif
} }
} else { } else {
RETURN_STATUS_UNEXPECTED("Unknown WaitFlag"); RETURN_STATUS_UNEXPECTED("Unknown WaitFlag");


Loading…
Cancel
Save