You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

prac.py 5.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import random
  2. import sys, os, queue
  3. import time
  4. environment = 'pre'
  5. sys.path.append('environment tag:{}'.format(environment))
  6. real_path = os.path.split(os.path.realpath(__file__))[0]
  7. sys.path.append('{}{}..'.format(real_path, os.sep))
  8. from locust import HttpUser, between, task, TaskSet, events
  9. from locust.runners import WorkerRunner, MasterRunner
  10. from config.common_config import *
  11. from biz import home_page, performance_biz
  12. from lib.thread_logger import get_logger
  13. from lib.edu_http_session import EduHttpSession
  14. from lib.data_convert import string_to_base64
  15. from api.educoder import practices as _api_practices
  16. from api.educoder import mypractices as _api_mypractices
  17. logger.to_html = False
  18. stu_queue = queue.Queue()
  19. @events.init.add_listener
  20. def on_locust_init_testdata(environment, **_kwargs):
  21. """worker启动时, 注册消息类型(stu_accounts), 用于接收master分发的学生账号"""
  22. if isinstance(environment.runner, WorkerRunner):
  23. environment.runner.register_message("stu_accounts", recv_stu_accounts)
  24. @events.test_start.add_listener
  25. def on_test_start(environment, **_kwargs):
  26. """master开始测试时,根据worker数量将所有学生账号分发"""
  27. if isinstance(environment.runner, WorkerRunner):
  28. return
  29. worker_count = environment.runner.worker_count
  30. chunk_size = int(performance_biz.thread_num / worker_count)
  31. # environment.runner.clients 是一个列表,里面放的是每个worker的ID
  32. for i, worker in enumerate(environment.runner.clients):
  33. # 根据数据拿到的下标,截取数据
  34. data = performance_biz.student_accounts[i * chunk_size:i * chunk_size + chunk_size]
  35. # 发送消息给test_users,并且指定worker
  36. index_info = {"worker": worker, 'data': data}
  37. # 发送详情信息
  38. environment.runner.send_message("stu_accounts", index_info, worker)
  39. def recv_stu_accounts(environment, msg, **kwargs):
  40. """接收master分发的学生账号"""
  41. global stu_list
  42. for stu in msg.data['data']:
  43. stu_queue.put(stu)
  44. class TaskTest(TaskSet):
  45. def on_start(self):
  46. self.user_name = stu_queue.get()
  47. self.client.logger = get_logger('{}'.format(self.user_name))
  48. self.client.logger.info('start test: {}'.format(self.user_name))
  49. self.my_code = string_to_base64(data=performance_biz.answer_code)
  50. def on_stop(self):
  51. self.client.logger.info('stop test: {}'.format(self.user_name))
  52. stu_queue.put(self.user_name)
  53. @task
  54. def test_practices(self):
  55. # 登录
  56. my_header = default_headers.copy()
  57. my_header['client'] = self.client
  58. home_page.login_educoder(username=self.user_name, password='12345678', headers=my_header)
  59. # 打开刷题页面(获取 identifier)
  60. res = _api_practices.start(practices_identifier=performance_biz.practices_identifier, headers=my_header)
  61. mypractices_identifier = res.get('identifier')
  62. assert res.get('status') == 0 and mypractices_identifier, f"开始刷题失败:{res}"
  63. mypractices_info = _api_mypractices.get_mypractices(headers=my_header, mypractices_identifier=mypractices_identifier, hidePopLogin=True)
  64. assert mypractices_info.get('practice'), f"在线刷题-获取题目信息失败:{mypractices_info}"
  65. # 提交代码、开始评测
  66. res = _api_mypractices.update_code(headers=my_header,
  67. mypractices_identifier=mypractices_identifier,
  68. language=performance_biz.language,
  69. code=self.my_code)
  70. assert res.get('status') == 0, f'{self.user_name} 提交代码失败:{res} ,{mypractices_identifier}, {time.time()}'
  71. res = _api_mypractices.code_submit(headers=my_header, mypractices_identifier=mypractices_identifier)
  72. assert res.get('status') == 0, f'{self.user_name} 开始评测失败:{res} ,{mypractices_identifier}, {time.time()}'
  73. # 获取评测结果
  74. t_start = time.time()
  75. flag = False
  76. result = {}
  77. while time.time() - t_start <= performance_biz.limit_time and flag is False:
  78. time.sleep(1)
  79. result = _api_mypractices.result(headers=my_header, mypractices_identifier=mypractices_identifier)
  80. assert result.get('status') in [0, 1], f'获取评测状态异常:{result}'
  81. if result.get('status') == 0:
  82. flag = True
  83. result_data = result.get('data')
  84. assert result_data, f'{self.user_name} 获取评测结果异常:{result} ,{mypractices_identifier}, {time.time()}'
  85. passed = result_data.get('passed')
  86. assert passed is True, f'{self.user_name} 评测未通过:{result} ,{mypractices_identifier}, {time.time()}'
  87. class ExercisePerformance(HttpUser):
  88. host = host
  89. wait_time = between(performance_biz.task_wait, performance_biz.task_wait) # 任务间的时间间隔
  90. tasks = [TaskTest]
  91. def __init__(self, *args, **kwargs):
  92. super().__init__(*args, **kwargs)
  93. self.client = EduHttpSession(
  94. base_url=self.host,
  95. request_event=self.environment.events.request,
  96. user=self,
  97. pool_manager=self.pool_manager,
  98. rest_time=performance_biz.request_wait # 请求间的时间间隔
  99. )

test

Contributors (3)