import random import sys, os, queue import time environment = 'pre' sys.path.append('environment tag:{}'.format(environment)) real_path = os.path.split(os.path.realpath(__file__))[0] sys.path.append('{}{}..'.format(real_path, os.sep)) from locust import HttpUser, between, task, TaskSet, events from locust.runners import WorkerRunner, MasterRunner from config.common_config import * from biz import home_page, performance_biz from lib.thread_logger import get_logger from lib.edu_http_session import EduHttpSession from lib.data_convert import string_to_base64 from api.educoder import practices as _api_practices from api.educoder import mypractices as _api_mypractices logger.to_html = False stu_queue = queue.Queue() @events.init.add_listener def on_locust_init_testdata(environment, **_kwargs): """worker启动时, 注册消息类型(stu_accounts), 用于接收master分发的学生账号""" if isinstance(environment.runner, WorkerRunner): environment.runner.register_message("stu_accounts", recv_stu_accounts) @events.test_start.add_listener def on_test_start(environment, **_kwargs): """master开始测试时,根据worker数量将所有学生账号分发""" if isinstance(environment.runner, WorkerRunner): return worker_count = environment.runner.worker_count chunk_size = int(performance_biz.thread_num / worker_count) # environment.runner.clients 是一个列表,里面放的是每个worker的ID for i, worker in enumerate(environment.runner.clients): # 根据数据拿到的下标,截取数据 data = performance_biz.student_accounts[i * chunk_size:i * chunk_size + chunk_size] # 发送消息给test_users,并且指定worker index_info = {"worker": worker, 'data': data} # 发送详情信息 environment.runner.send_message("stu_accounts", index_info, worker) def recv_stu_accounts(environment, msg, **kwargs): """接收master分发的学生账号""" global stu_list for stu in msg.data['data']: stu_queue.put(stu) class TaskTest(TaskSet): def on_start(self): self.user_name = stu_queue.get() self.client.logger = get_logger('{}'.format(self.user_name)) self.client.logger.info('start test: {}'.format(self.user_name)) self.my_code = string_to_base64(data=performance_biz.answer_code) def on_stop(self): self.client.logger.info('stop test: {}'.format(self.user_name)) stu_queue.put(self.user_name) @task def test_practices(self): # 登录 my_header = default_headers.copy() my_header['client'] = self.client home_page.login_educoder(username=self.user_name, password='12345678', headers=my_header) # 打开刷题页面(获取 identifier) res = _api_practices.start(practices_identifier=performance_biz.practices_identifier, headers=my_header) mypractices_identifier = res.get('identifier') assert res.get('status') == 0 and mypractices_identifier, f"开始刷题失败:{res}" mypractices_info = _api_mypractices.get_mypractices(headers=my_header, mypractices_identifier=mypractices_identifier, hidePopLogin=True) assert mypractices_info.get('practice'), f"在线刷题-获取题目信息失败:{mypractices_info}" # 提交代码、开始评测 res = _api_mypractices.update_code(headers=my_header, mypractices_identifier=mypractices_identifier, language=performance_biz.language, code=self.my_code) assert res.get('status') == 0, f'{self.user_name} 提交代码失败:{res} ,{mypractices_identifier}, {time.time()}' res = _api_mypractices.code_submit(headers=my_header, mypractices_identifier=mypractices_identifier) assert res.get('status') == 0, f'{self.user_name} 开始评测失败:{res} ,{mypractices_identifier}, {time.time()}' # 获取评测结果 t_start = time.time() flag = False result = {} while time.time() - t_start <= performance_biz.limit_time and flag is False: time.sleep(1) result = _api_mypractices.result(headers=my_header, mypractices_identifier=mypractices_identifier) assert result.get('status') in [0, 1], f'获取评测状态异常:{result}' if result.get('status') == 0: flag = True result_data = result.get('data') assert result_data, f'{self.user_name} 获取评测结果异常:{result} ,{mypractices_identifier}, {time.time()}' passed = result_data.get('passed') assert passed is True, f'{self.user_name} 评测未通过:{result} ,{mypractices_identifier}, {time.time()}' class ExercisePerformance(HttpUser): host = host wait_time = between(performance_biz.task_wait, performance_biz.task_wait) # 任务间的时间间隔 tasks = [TaskTest] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.client = EduHttpSession( base_url=self.host, request_event=self.environment.events.request, user=self, pool_manager=self.pool_manager, rest_time=performance_biz.request_wait # 请求间的时间间隔 )