Python之并发请求

在服务端的测试中,除了考虑服务端的业务功能和API的各个兼容性外,还需要考虑的就是服务端的稳定性以及高并发请求下服务端的承载能力。关于并发多少的数量以及具体的响应时间要求,其实每个产品的形态都是不一样的,很难使用标准的说法来进行统一。这具体看被测试的组件它所面对的业务形态,如果业务形态是是很少使用的产品,其实对性能也就没什么要求了。所以关于这点还是得根据被测组件的架构设计,承载的量以及业务目标。本文章主要分享使用Python语言编写一个简单的并发请求的测试代码。 在Python的并发编程模式中,主要涉及的点是线程以及进程,还有对应的协程。而locust主要是基于协程来进行设计,协程我们可以把它理解为微线程。在IO密集性和CPU密集性中,如果是IO密集性的,建议使用多线程的方式更加高效,如果是CPU密集性,建议使用多进程的方式更加高效。本文章主要分享基于IO密集性,也就是多线程的方式。开启一个线程的方式是非常简单,我们可以通过函数式的编程方式,也可以使用面向对象的编程方式,见如下的具体案例代码,函数式的方式:

from threading import  Thread
import  time as t
import  random


def job(name):
   print('我是{0},我要开始工作啦'.format(name))

if __name__ == '__main__':
   t=Thread(target=job,args=('李四',))
   t.start()
   print('主线程执行结束')
面向对象的方式:
from threading import  Thread
import  time as t
import  random

class Job(Thread):
   def __init__(self,name):
      super().__init__()
      self.name=name

   def run(self) -> None:
      print('我是{0},我要开始工作啦'.format(self.name))

if __name__ == '__main__':
   t=Job('李四')
   t.start()
   print('主线程程序执行结束')
其实在Thread的类中,并没有返回被测函数的返回值,也就是说我们在测试API接口的时候,需要拿到被测接口的状态码,请求响应时间,和响应数据,那么我们就需要重新Thread类继承后对run()的方法进行重写,拿到被测函数中我们所期望的数据,具体案例代码如下:
from threading import  Thread


class ThreadTest(Thread):
   def __init__(self,func,args=()):
      '''
      :param func: 被测试的函数
      :param args: 被测试的函数的返回值
      '''
      super(ThreadTest,self).__init__()
      self.func=func
      self.args=args

   def run(self) -> None:
      self.result=self.func(*self.args)

   def getResult(self):
      try:
         return self.result
      except BaseException as e:
         return e.args[0]
这里我们以测试百度首页作为案例,来并发请求后,拿到并发请求后响应时间,状态码,然后依据响应时间拿到中位数以及其他的数据,具体完整案例代码如下:
from threading import  Thread
import  requests
import  matplotlib.pyplot as plt
import  datetime
import  time
import  numpy as np
import  json


class ThreadTest(Thread):
   def __init__(self,func,args=()):
      '''
      :param func: 被测试的函数
      :param args: 被测试的函数的返回值
      '''
      super(ThreadTest,self).__init__()
      self.func=func
      self.args=args

   def run(self) -> None:
      self.result=self.func(*self.args)

   def getResult(self):
      try:
         return self.result
      except BaseException as e:
         return e.args[0]


def baiDu(code,seconds):
   '''
   :param code: 状态码
   :param seconds: 请求响应时间
   :return:
   '''
   r=requests.get(url='http://www.baidu.com/')
   code=r.status_code
   seconds=r.elapsed.total_seconds()
   return code,seconds

def calculationTime(startTime,endTime):
   '''计算两个时间之差,单位是秒'''
   return (endTime-startTime).seconds


def getResult(seconds):
   '''获取服务端的响应时间信息'''
   data={
      'Max':sorted(seconds)[-1],
      'Min':sorted(seconds)[0],
      'Median':np.median(seconds),
      '99%Line':np.percentile(seconds,99),
      '95%Line':np.percentile(seconds,95),
      '90%Line':np.percentile(seconds,90)
   }
   return data


def highConcurrent(count):
   '''
   对服务端发送高并发的请求
   :param cout: 并发数
   :return:
   '''
   startTime=datetime.datetime.now()
   sum=0
   list_count=list()
   tasks=list()
   results = list()
   #失败的信息
   fails=[]
   #成功任务数
   success=[]
   codes = list()
   seconds = list()

   for i in range(1,count):
      t=ThreadTest(baiDu,args=(i,i))
      tasks.append(t)
      t.start()

   for t in tasks:
      t.join()
      if t.getResult()[0]!=200:
         fails.append(t.getResult())
      results.append(t.getResult())

   endTime=datetime.datetime.now()
   for item in results:
      codes.append(item[0])
      seconds.append(item[1])
   for i in range(len(codes)):
      list_count.append(i)

   #生成可视化的趋势图
   fig,ax=plt.subplots()
   ax.plot(list_count,seconds)
   ax.set(xlabel='number of times', ylabel='Request time-consuming',
          title='olap continuous request response time (seconds)')
   ax.grid()
   fig.savefig('olap.png')
   plt.show()

   for i in seconds:
      sum+=i
   rate=sum/len(list_count)
   # print('\n总共持续时间:\n',endTime-startTime)
   totalTime=calculationTime(startTime=startTime,endTime=endTime)
   if totalTime<1:
      totalTime=1
   #吞吐量的计算
   try:
      throughput=int(len(list_count)/totalTime)
   except Exception as e:
      print(e.args[0])
   getResult(seconds=seconds)
   errorRate=0
   if len(fails)==0:
      errorRate=0.00
   else:
      errorRate=len(fails)/len(tasks)*100
   throughput=str(throughput)+'/S'
   timeData=getResult(seconds=seconds)
   dict1={
      '吞吐量':throughput,
      '平均响应时间':rate,
      '响应时间':timeData,
      '错误率':errorRate,
      '请求总数':len(list_count),
      '失败数':len(fails)
   }
   return  json.dumps(dict1,indent=True,ensure_ascii=False)

if __name__ == '__main__':
    print(highConcurrent(count=1000))
上面的代码执行后,就会形成可视化的请求响应时间以及其他的信息,执行后显示如下的信息:
{
   "吞吐量": "500/S",
   "平均响应时间": 0.08835436199999998,
   "响应时间": {
      "Max": 1.5547,
      "Min": 0.068293,
      "Median": 0.0806955,
      "99%Line": 0.12070111,
      "95%Line": 0.10141509999999998,
      "90%Line": 0.0940216
   },
   "错误率": 0.0,
   "请求总数": 1000,
   "失败数": 0
}


注: 上文内容不用于商业目的, 如涉及知识产权问题, 请权利人联系 vtest-top@foxmail.com , 将立即处理, 谢谢!