网站多语言包,制作图片的软件ppt,永久免费观看不收费的软件app,手机版免费个人简历模板一、背景
这还是2个月前做的一次接口性能测试#xff0c;关于locust脚本的单机多核运行#xff0c;以及主从节点之间的数据通信。
先简单交代下背景#xff0c;在APP上线之前#xff0c;需要对登录接口进行性能测试。经过评估#xff0c;我还是优先选择了locust来进行脚… 一、背景
这还是2个月前做的一次接口性能测试关于locust脚本的单机多核运行以及主从节点之间的数据通信。
先简单交代下背景在APP上线之前需要对登录接口进行性能测试。经过评估我还是优先选择了locust来进行脚本开发本次用到了locust的单机多核运行能力只不过这里还涉及到主从节点之间数据通信。现成的可参考的有效文档甚少所以还是自己摸着官方文档过河比较靠谱。
顺带提一下学习框架这种东西最好的教程其实还得是官方文档以及框架源码了这里贴上locust官方文档链接需要的可以自行学习https://docs.locust.io/en/stable/what-is-locust.html
二、代码编写
其实脚本代码的编写一大重点就是如何处理测试数据不同的测试需求对于测试数据的处理是不同的。比如这次的需求手机号不能重复。另外考虑到长时间的负载压力数据量还得足够。
现在我也找了很多测试的朋友做了一个分享技术的交流群共享了很多我们收集的技术文档和视频教程。
如果你不想再体验自学时找不到资源没人解答问题坚持几天便放弃的感受
可以加入我们一起交流。而且还有很多在自动化性能安全测试开发等等方面有一定建树的技术大牛
分享他们的经验还会分享很多直播讲座和技术沙龙
可以免费学习划重点开源的
qq群号110685036【暗号csdn999】 最后测试数据还需要处理那么我使用的测试号段是非真实号码段测试结束后可以查询对应号段内的手机号进行相关业务数据的清理。
1. 代码概览
还是老样子先附上全部代码然后对其结构进行拆分讲解。
import random
import time
from collections import dequefrom locust import HttpUser, task, run_single_user, TaskSet, events
from locust.runners import WorkerRunner, MasterRunnerCURRENT_TIMESTAMP str(round(time.time() * 1000))
RANDOM str(random.randint(10000000, 99999999))
MOBILE_HEADER {skip-request-expired: true,skip-auth: true,skip-sign: true,os: IOS,device-id: 198EA6A4677649018708B400F3DF69FB,nonce: RANDOM,sign: 12333,version: 1.2.0,timestamp: CURRENT_TIMESTAMP,Content-Type: application/json
}last_mobile
worker_mobile_deque deque()# 13300120000, 13300160000 新用户注册号段events.test_start.add_listener
def on_test_start(environment, **_kwargs):if not isinstance(environment.runner, WorkerRunner):mobile_list []for i in range(13300120000, 13300160000):mobile_list.append(i)mobile_list_length len(mobile_list)print(列表已生成总计数量, mobile_list_length)worker_count environment.runner.worker_countchunk_size int(mobile_list_length / worker_count)print(f平均每个worker分得的手机号数量{chunk_size})for i, worker in enumerate(environment.runner.clients):start_index i * chunk_sizeif i 1 worker_count:end_index start_index chunk_sizeelse:end_index len(mobile_list)data mobile_list[start_index:end_index]environment.runner.send_message(mobile_list, data, worker)def setup_mobile_list(environment, msg, **kwargs):len_msg_data len(msg.data)print(fworker收到的master传来的数据号段{msg.data[0]} ~ {msg.data[len_msg_data-1]})global worker_mobile_dequeworker_mobile_deque deque(msg.data)events.init.add_listener
def on_locust_init(environment, **_kwargs):if not isinstance(environment.runner, MasterRunner):environment.runner.register_message(mobile_list, setup_mobile_list)class VcodeLoginUser(TaskSet):# wait_time between(5, 5)taskdef vcode_login(self):test_mobile worker_mobile_deque.popleft()print(当前获取的手机号, test_mobile)# print(当前队列大小, len(worker_mobile_deque))global last_mobilelast_mobile test_mobilewith self.client.post(/g/sendMobileVcode,headersMOBILE_HEADER,json{busiType: login, mobile: str(test_mobile)}) as send_response:try:send_response_json send_response.json()if send_response_json[message] success:params {mobile: str(test_mobile), vcode: 111111}# print(test_mobile, 登录请求参数, params)with self.client.post(/g/vcodeLogin,jsonparams,headersMOBILE_HEADER,catch_responseTrue) as login_response:# print(login_response.json)login_response_json login_response.json()if login_response_json[message] ! success:login_response.failure(message not equal success)elif login_response_json[code] ! 0:login_response.failure(code not equal 0)elif login_response_json[data][rId] :login_response.failure(rid is null)elif login_response_json[data][mobile] ! str(test_mobile):login_response.failure(mobile is error,入参手机号{},返回的手机号{}.format(test_mobile, login_response.json()[data][mobile]))# print(test_mobile, 请求结果, login_response.json())else:send_response.failure({} send code fail.format(test_mobile))except Exception as e:send_response.failure(send code fail {}.format(e))events.test_stop.add_listenerdef on_test_stop(environment, **kwargs):print(脚本结束)print(当前队列大小, len(worker_mobile_deque))print(最后的手机号, last_mobile)class LocustLogin(HttpUser):tasks [VcodeLoginUser]host https://qa.test.comif __name__ __main__:run_single_user(LocustLogin)
2. 代码拆解-要加必要的断言
首先是基于locust开发的http请求的脚本大结构是不变的依旧是两大块HttpUser、TaskSet这里不再对其讲解了大伙看下官方文档就明白了。
接下来就是类VcodeLoginUser可以看到在这里面是定义了单个用户的详细动作。注意这里要加上必要的断言。否则仅靠框架的非200外的错误断言还是不够的。
比如我这里关注登录成功后的几个必要字段code、rId、mobile这些一定是要符合断言的才可以。
果不其然压测过程中就发现了并发情况下会出现的问题入参手机号是a接口返回的手机号是b。并发量越大错误越多。如果我只断言code0那么这个问题就不容易发现了虽然接口返回的code都是成功的但是业务上已经存在错误了。
...with self.client.post(/g/sendMobileVcode,headersMOBILE_HEADER,json{busiType: login, mobile: str(test_mobile)}) as send_response:try:send_response_json send_response.json()if send_response_json[message] success:params {mobile: str(test_mobile), vcode: 111111}# print(test_mobile, 登录请求参数, params)with self.client.post(/g/vcodeLogin,jsonparams,headersMOBILE_HEADER,catch_responseTrue) as login_response:# print(login_response.json)login_response_json login_response.json()if login_response_json[message] ! success:login_response.failure(message not equal success)elif login_response_json[code] ! 0:login_response.failure(code not equal 0)elif login_response_json[data][rId] :login_response.failure(rid is null)elif login_response_json[data][mobile] ! str(test_mobile):login_response.failure(mobile is error,入参手机号{},返回的手机号{}.format(test_mobile, login_response.json()[data][mobile]))# print(test_mobile, 请求结果, login_response.json())else:send_response.failure({} send code fail.format(test_mobile))except Exception as e:send_response.failure(send code fail {}.format(e))
...3. 代码拆解-单机多核处理
接下来就是重点了如何在单台机器上用到多cpu。最开始的时候我忽略了这点后来发现负载上不去一打开资源监视器才发现只有1个cpu在满负载运行。
这里示意图仅供参考我的win笔记本是12c的。 因为Locust是单进程的不能充分利用多核CPU于是需要我们压力机上开启一个master进程然后再开启多个slave进程组成一个单机分布式系统即可。
开启的方式也很简单
# 开启 master
locust -f locustfile.py --master# 开启 slave
locust -f locustfile.py --slave这里我们开启 slave 节点的时候可以开启对应多个命令行窗口当时没截图借用网上的图片示意一下: 开启后你的web界面就可以实时看到当前启动的节点数了。 4. 代码拆解-处理主从节点数据通信
开启主从节点倒是很容易测试数据就需要针对性进行处理了。
因为我的测试登录用的手机号不可以重复所以要保证不同 slave 节点上同时运行的代码产生的手机号都不可以重复。
继续扒了下官方文档发现可以通过增加事件监听器来实现我的需求。
这里我加了三个监听器分别来处理不同的事情
events.init.add_listener在locust运行初始化的时候执行events.test_start.add_listener: 在测试代码开始运行的时候执行events.test_stop.add_listener 在测试代码结束运行的时候执行
events.test_start.add_listener 首先在events.test_start.add_listener里我主要处理全量数据的生成以及把这些手机号平均分配给生成的 slave 节点。
events.test_start.add_listener
def on_test_start(environment, **_kwargs):if not isinstance(environment.runner, WorkerRunner):mobile_list []for i in range(13300120000, 13300160000):mobile_list.append(i)mobile_list_length len(mobile_list)print(列表已生成总计数量, mobile_list_length)worker_count environment.runner.worker_countchunk_size int(mobile_list_length / worker_count)print(f平均每个worker分得的手机号数量{chunk_size})for i, worker in enumerate(environment.runner.clients):start_index i * chunk_sizeif i 1 worker_count:end_index start_index chunk_sizeelse:end_index len(mobile_list)data mobile_list[start_index:end_index]environment.runner.send_message(mobile_list, data, worker)注意这里最后一行中定义的mobile_list需要定义一个对应函数来接收这个数据。
def setup_mobile_list(environment, msg, **kwargs):len_msg_data len(msg.data)print(fworker收到的master传来的数据号段{msg.data[0]} ~ {msg.data[len_msg_data-1]})global worker_mobile_dequeworker_mobile_deque deque(msg.data)这样不同的 slave 节点脚步分配到的手机号段就是不同的了解决测试数据重复的问题。
另外我定义另一个全局变量worker_mobile_deque这样不同的 slave 节点接收的数据就可以放到队列里运行的时候从队列里面取用一个少一个直到队列里的数据用完。
events.init.add_listener 接着就是在events.init.add_listener里要注册上面定义的数据字段和处理函数。
events.init.add_listener
def on_locust_init(environment, **_kwargs):if not isinstance(environment.runner, MasterRunner):environment.runner.register_message(mobile_list, setup_mobile_list)events.test_stop.add_listener 最后在events.test_stop.add_listener这里可以做一些后置处理我是简单起见只是记录输出了本次测试用到了哪个号码段这样我下次运行脚本的时候可以从后面的数据开始最大化测试数据的使用不浪费。 events.test_stop.add_listenerdef on_test_stop(environment, **kwargs):print(脚本结束)print(当前队列大小, len(worker_mobile_deque))print(最后的手机号, last_mobile)三、小结
脚本调试完后可以稳定运行接下来就是测试的过程了进行了服务器单节点、多节点负载能力的测试水平拓展能力的测试以及服务动态扩容、长时间高负载测试。测试的角度观察测试报告服务各项指标的情况。只不过涉及到开发端调优分析的工作并未能参与很多。不过大概还是那些常见问题后续有机会可以再单独分享了。
从使用角度来看locust深得我爱比起 jemter真的太轻便了代码灵活度也非常高单机负载能力也是响当当的这点比jemeter强太多了。我这个项目不需要非常高的量所以单机只用了8c就够了。如果有小伙伴需要非常高的并发locust 也支持多机器分布式进一步扩大并发能力。
END今天的分享就到此结束了点赞关注不迷路~