You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

taskexecutor.py 3.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. /**
  3. * Copyright 2020 Zhejiang Lab. All Rights Reserved.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. * =============================================================
  17. */
  18. """
  19. # coding:utf-8
  20. import codecs
  21. import sched
  22. import sys
  23. import json
  24. import logging
  25. import time
  26. import common.RedisUtil as f
  27. import common.config as config
  28. import annotation as annotation
  29. from datetime import datetime
  30. import luascript.delaytaskscript as delay_script
  31. logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
  32. level=logging.DEBUG)
  33. schedule = sched.scheduler(time.time, time.sleep)
  34. sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
  35. delayId = ""
  36. def annotationExecutor(redisClient, key):
  37. """Annotation task method.
  38. Args:
  39. redisClient: redis client.
  40. key: annotation task key.
  41. """
  42. global delayId
  43. print('-------------process one-----------------')
  44. try:
  45. delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
  46. logging.info('get element is {0}'.format(key))
  47. key = key.decode()
  48. jsonStr = f.getByKey(redisClient, key.replace('"', ''));
  49. print(jsonStr)
  50. jsonObject = json.loads(jsonStr.decode('utf-8'));
  51. image_path_list = []
  52. id_list = []
  53. label_list = []
  54. label_list = jsonObject['labels']
  55. for fileObject in jsonObject['files']:
  56. image_path_list.append('/nfs/' + fileObject['url'])
  57. id_list.append(fileObject['id'])
  58. print(image_path_list)
  59. print(id_list)
  60. print(label_list)
  61. coco_flag = 0
  62. if "labelType" in jsonObject:
  63. label_type = jsonObject['labelType']
  64. if label_type == 3:
  65. coco_flag = 80
  66. annotations = annotation._annotation(0, image_path_list, id_list, label_list, coco_flag);
  67. result = {"task": key, "annotations": annotations}
  68. f.pushToQueue(redisClient, config.annotationFinishQueue, json.dumps(result))
  69. redisClient.zrem(config.annotationStartQueue, key)
  70. except Exception as e:
  71. print(e)
  72. def delaySchduled(inc, redisClient):
  73. """Delay task method.
  74. Args:
  75. inc: scheduled task time.
  76. redisClient: redis client.
  77. """
  78. try:
  79. print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
  80. redisClient.eval(delay_script.delayTaskLua, 1, config.annotationStartQueue, delayId, int(time.time()))
  81. schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
  82. except Exception as e:
  83. print("delay error" + e)
  84. def delayKeyThread(redisClient):
  85. """Delay task thread.
  86. Args:
  87. redisClient: redis client.
  88. """
  89. schedule.enter(0, 0, delaySchduled, (5, redisClient))
  90. schedule.run()

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能

Contributors (1)