- 程序:例如pycharm、 QQ都是程序
- 进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元
- 不仅可以通过线程完成多任务,进程也是可以的。
我们打开任务管理器,就能看到我们这台电脑上打开了哪些进程。
进程是操作系统分配资源的基本单位,每运行一个程序,操作系统这个时候就会启动一个进程,把你这个程序运行所要的所有资源都放在这个进程里面。那么这个进程在执行的时候,他也有这个并发和并行的状态,和线程是一样的。
如果你的电脑是个八核的CPU,那么这个时候你启动8个程序,每个程序都会有一个对应的CPU来提供进程相应的支持,那这个时候就是处于一个并行的状态,那如果说你的cpu1个,打开了多个程序,那这个时候进程也是处于并发执行的状态,这是由于操作系统的任务调度算法他会去调度先执行哪个进程,执行完了再去执行哪一个,这个是操作系统去搞得,跟进程没关系。
进程在执行的时候分为三个状态:
- 工作中,任务数往往大于CPU的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态
- 就绪状态:运行的条件都已经满足了,正在等cpu执行
- 执行状态:cpu正在执行其功能
- 等待状态:等待某些条件满足,例如一个程序sleep了,此时就处于等待状态
功能:
- 进程,能够完成多任务,比如 在一台电脑上能够同时运行多个软件
- 线程,能够完成多任务,比如 一个QQ中的多个聊天窗口
定义的不同
- 进程是系统进行资源分配单位
- 线程是进程的一个实体,是CPU调度和任务分派的基本单位,它是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。
我们试着通过设置线程的方式去设置进程试试看:
from multiprocessing import Process a = 0 def work1(): global a for i in range(100000): a += 1 print("work1------a", a) def work2(): global a for i in range(100000): a += 1 print("work2------a", a) p = Process(target=work1) p.start() p2 = Process(target=work2) p2.start() p.join() p2.join() print('主进程a', a)
我们执行以下发现报错了,这是为什么?
原因:因为进程之间的资源是独立的。
进程每次执行之前,都会把他要执行的代码和数据都会备一份,两个进程都会各备份一份,备份代码的话就会把我们写的代码都加载一遍(类似于我们平常导入模块一样),然后代码里我们又写了创建进程的代码,这就会出现循环加载的情况。
这种情况我们怎么处理?
答:放在 if __name__ == '__main__'下面,这样子就能执行成功了。(windows绝对会有问题,mac有可能会有问题,还是因为操作系统执行多进程不同的原因。)
(2)守护进程 daemon守护进程是什么意思叻?答:被设置为守护进程的p进程,他会去守护那个主进程。就是如果说主进程执行结束之后,那们这个守护进程会自动关闭。
我们来个demo,首先把p.join,p2.join 这两行代码注释掉,然后在p进程里参数新增一个deemon=True,
from multiprocessing import Process a = 0 def work1(): global a for i in range(100000): a += 1 print("work1------a", a) def work2(): global a for i in range(100000): a += 1 print("work2------a", a) if __name__ == '__main__': p = Process(target=work1, daemon=True) p.start() p2 = Process(target=work2) p2.start() # p.join() # p2.join() print('主进程a', a) # 打印结果 主进程a 0 work2------a 100000
其他方法就跟线程一个样子,就不做过多赘述啦
(3)获取进程 pidos.getpid() 获取子进程pid
os.getppid() 获取父进程id
from multiprocessing import Process import os a = 0 def work1(): global a for i in range(100000): a += 1 print("子进程号{}--work2------a:", os.getpid(), a) print('父进程id:',os.getppid()) def work2(): global a for i in range(100000): a += 1 print("子进程号{}--work2------a:", os.getpid(),a) print('父进程id:', os.getppid()) def mian(): # 参数,daemon=True,设置进程是否作为守护进程,如果是守护进程,那么该子进程会同主进程一起被关闭 # p = Process(target=work1,daemon=True) p = Process(target=work1) p.start() p2 = Process(target=work2) p2.start() # p.join() # p2.join() if __name__ == '__main__': # 注意点:windows下 程序中如果有使用多进程,程序的入口一定要写在 if __name__ == '__main__'里面: print('主进程a:',os.getpid(), a) mian()(4) 多进程数据通信的问题
我们看下下面的代码,发现执行结果,每一个地址都请求了4次,为什么线程不会这么去执行而进程会?因为进程他自己都有独一份属于自己的url_list数据(上面讲到过)。
from multiprocessing import Process import time url_list = [f"https://www.baidu.com-{i}" for i in range(100)] def work(): while url_list: url = url_list.pop() print(url) time.sleep(0.5) def main(): start_time = time.time() ts = [] for i in range(4): t1 = Process(target=work) t1.start() ts.append(t1) for t in ts: t.join() end_time = time.time() print('执行时间为:', end_time - start_time) if __name__ == '__main__': main() # 打印结果 https://www.baidu.com-99 https://www.baidu.com-99 https://www.baidu.com-99 https://www.baidu.com-99 https://www.baidu.com-98 https://www.baidu.com-98 https://www.baidu.com-98 https://www.baidu.com-98 https://www.baidu.com-97 https://www.baidu.com-97 https://www.baidu.com-97 https://www.baidu.com-97 https://www.baidu.com-96 https://www.baidu.com-96 https://www.baidu.com-96 https://www.baidu.com-96 ... ... ... ...
那么我们的需求就来了:我们怎么让4个线程去跑这个list,并且不会重复跑l列表里的值(也就是说,怎么让这4个进程共享这一份数据)。
有的小伙伴说可以用队列,我们刚刚学了队列 Queue,但是我们有提到过,这个Queue只能在线程里面用,通常是用来在线程和线程之间保证这个数据的安全性的。
那么进程和进程之间这个队列是用不了的,那么用啥叻?用进程模块它里面自己封装的一个队列。
from multiprocessing import Process, Queue
我们看下进程里面封装的Queue这个类,实际上也是导入的原先那个Queue,在原先的Queue这个类上进行修改,达到了可以在进程和进程之间通讯这个功能。
下面我们上代码:
""" 进程之间的数据共享(进程通信) 1、在主进程中通过multiprocessing.Queue 创建一个队列 2、在创建子进程的时候,将队列当成参数传入各个子进程 3、在子进程中使用队列中的数据 queue.Queue和multiprocessing.Queue 有什么区别 ? queue.Queue: 只能在同一个进程中的多个线程之间使用 multiprocessing.Queue:可以再多个进程之间跨进程输出数据(通信) """ from multiprocessing import Process, Queue import time def work(urls): while not urls.empty(): url = urls.get() print(url) time.sleep(0.5) def main(): urls = Queue() for i in range(100): urls.put(f"https://www.baidu.com-{i}") # 创建4个线程 start_time = time.time() ts = [] for i in range(4): t1 = Process(target=work, args=(urls,)) t1.start() ts.append(t1) for t in ts: t.join() end_time = time.time() print('执行时间为:', end_time - start_time) if __name__ == '__main__': main()(5)进程池(扩展)
我们来个demo,给大家示范一下。
import os import random import time from multiprocessing import Pool def worker(msg): t_start = time.time() print("%s开始执行,进程号为%d" % (msg, os.getpid())) # random.random()随机生成0~1之间的浮点数 time.sleep(random.random() * 2) t_stop = time.time() print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start)) if __name__ == '__main__': po = Pool(3) # 定义一个进程池,最大进程数3 for i in range(0, 10): # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker, (i,)) # print("----start----") po.close() # 关闭进程池,关闭后po不再接收新的请求 po.join() # 等待po中所有子进程执行完成,必须放在close语句之后 print("-----end-----")(6)线程池、进程池的使用
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def work(name): # for i in range(10): time.sleep(0.5) print("------{}-----{}".format(name, i)) # with ThreadPoolExecutor(max_workers=4) as tp: # # submit 往线程中提交任务 # for i in range(100): # tp.submit(work, 'http://www.baidu.com-{}'.format(i)) # # shutdown:等待线程池中所有的任务执行完毕 # tp.shutdown(wait=True) # print("----end----") if __name__ == '__main__': with ProcessPoolExecutor(max_workers=4) as tp: # submit 往线程中提交任务 for i in range(100): tp.submit(work, 'http://www.baidu.com-{}'.format(i)) # shutdown:等待线程池中所有的任务执行完毕 tp.shutdown(wait=True) print("----end----")(7)进程池参数传递和获取执行结果 ①第一个方法
我们看打印结果:
ts.result() 是获取这个进程池执行完毕过后返回的结果,所以说他会一直等待这个ts执行完毕之后才会去执行ts2,所以这个时候我们得把ts ts2这2个提交任务的代码放在前面。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time def work(name): for i in range(4): time.sleep(1) print('------{}-----{}'.format(name, i)) return '{}进程返回的结果'.format(name) def main(): with ProcessPoolExecutor(max_workers=3) as pool: ts = pool.submit(work, 'musen') print(ts.result()) ts2 = pool.submit(work, 'musen2') print(ts2.result()) if __name__ == '__main__': main() # 打印结果 ------musen-----0 ------musen-----1 ------musen-----2 ------musen-----3 musen进程返回的结果 ------musen-----0 ------musen-----1 ------musen-----2 ------musen-----3 musen2进程返回的结果
修改后的代码:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time def work(name): for i in range(4): time.sleep(1) print('------{}-----{}'.format(name, i)) return '{}进程返回的结果'.format(name) def main(): with ProcessPoolExecutor(max_workers=3) as pool: ts = pool.submit(work, 'musen') ts2 = pool.submit(work, 'musen2') print(ts.result()) print(ts2.result()) if __name__ == '__main__': main() # 打印结果 ------musen-----0 ------musen2-----0 ------musen-----1 ------musen2-----1 ------musen-----2 ------musen2-----2 ------musen-----3 musen进程返回的结果 ------musen2-----3 musen2进程返回的结果②第二个方法(这个方法可以不用记)
或者这样子写,官方文档提供一个方法:
import concurrent from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time def work(name): for i in range(4): time.sleep(1) print('------{}-----{}'.format(name, i)) return '{}进程返回的结果'.format(name) def main(): with ProcessPoolExecutor(max_workers=3) as pool: ts = pool.submit(work, 'musen') ts2 = pool.submit(work, 'musen2') for item in concurrent.futures.as_completed([ts, ts2]): print(item.result()) if __name__ == '__main__': main() # 打印结果 ------musen-----0 ------musen2-----0 ------musen-----1 ------musen2-----1 ------musen-----2 ------musen2-----2 ------musen-----3 musen进程返回的结果 ------musen2-----3 musen2进程返回的结果③第三个方法map
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time def work(name): for i in range(4): time.sleep(1) print('------{}-----{}'.format(name, i)) return '{}进程返回的结果'.format(name) def main(): with ProcessPoolExecutor(max_workers=3) as pool: # 通过map去提交任务 res = pool.map(work, ['musen001', 'yuze001', 'xiaojian001']) # 获取多个任务执行的结果 print(list(res)) if __name__ == '__main__': main() # 打印结果 ------musen001-----0 ------yuze001-----0 ------xiaojian001-----0 ------musen001-----1 ------yuze001-----1 ------xiaojian001-----1 ------musen001-----2 ------yuze001-----2 ------xiaojian001-----2 ------musen001-----3 ------xiaojian001-----3 ------yuze001-----3 ['musen001进程返回的结果', 'yuze001进程返回的结果', 'xiaojian001进程返回的结果'](8)进程池并发执行测试用例
我们来个demo,testpy文件里展示:
import time import unittest from unittestreport import ddt, list_data @ddt class TestDome1(unittest.TestCase): data = [ {'title': "TestDome1用例1", "data": 11}, {'title': "TestDome1用例2", "data": 22}, {'title': "TestDome1用例3", "data": 33}] @list_data(data) def test_demo1_01(self, item): time.sleep(0.5) print("执行用例:{}".format(item)) @ddt class TestDome2(unittest.TestCase): data = [ {'title': "TestDome2用例1", "data": 11}, {'title': "TestDome2用例2", "data": 22}, {'title': "TestDome2用例3", "data": 33}] @list_data(data) def test_demo2_01(self, item): time.sleep(0.5) print("执行用例:{}".format(item))
我们执行测试用例,设置的thread_count=1,线程数为1:
import unittest from unittestreport import TestRunner suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase') TestRunner(suite).run(thread_count=1) # 打印结果 执行用例:{'title': 'TestDome1用例1', 'data': 11} test_demo1_01_01 (test_demo.TestDome1)执行——>【通过】 执行用例:{'title': 'TestDome1用例2', 'data': 22} test_demo1_01_02 (test_demo.TestDome1)执行——>【通过】 执行用例:{'title': 'TestDome1用例3', 'data': 33} test_demo1_01_03 (test_demo.TestDome1)执行——>【通过】 执行用例:{'title': 'TestDome2用例1', 'data': 11} test_demo2_01_01 (test_demo.TestDome2)执行——>【通过】 执行用例:{'title': 'TestDome2用例2', 'data': 22} test_demo2_01_02 (test_demo.TestDome2)执行——>【通过】 执行用例:{'title': 'TestDome2用例3', 'data': 33} test_demo2_01_03 (test_demo.TestDome2)执行——>【通过】 执行用例:{'title': 'Testmusen1用例1', 'data': 11} test_musen1_01_01 (test_musen.TestMusen1)执行——>【通过】 执行用例:{'title': 'Testmusen1用例2', 'data': 22} test_musen1_01_02 (test_musen.TestMusen1)执行——>【通过】 执行用例:{'title': 'Testmusen1用例3', 'data': 33} test_musen1_01_03 (test_musen.TestMusen1)执行——>【通过】 执行用例:{'title': 'Testmusen2用例1', 'data': 11} test_musen2_01_01 (test_musen.TestMusen2)执行——>【通过】 执行用例:{'title': 'Testmusen2用例2', 'data': 22} test_musen2_01_02 (test_musen.TestMusen2)执行——>【通过】 执行用例:{'title': 'Testmusen2用例3', 'data': 33} test_musen2_01_03 (test_musen.TestMusen2)执行——>【通过】
我们把线程数设置为3试试看,很明显运行速度变快了:
import unittest from unittestreport import TestRunner suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase') TestRunner(suite).run(thread_count=3) # 打印结果 执行用例:{'title': 'TestDome1用例1', 'data': 11}执行用例:{'title': 'Testmusen1用例1', 'data': 11}执行用例:{'title': 'TestDome2用例1', 'data': 11} test_demo2_01_01 (test_demo.TestDome2)执行——>【通过】 test_musen1_01_01 (test_musen.TestMusen1)执行——>【通过】 test_demo1_01_01 (test_demo.TestDome1)执行——>【通过】 执行用例:{'title': 'TestDome1用例2', 'data': 22} test_demo1_01_02 (test_demo.TestDome1)执行——>【通过】 执行用例:{'title': 'Testmusen1用例2', 'data': 22} test_musen1_01_02 (test_musen.TestMusen1)执行——>【通过】 执行用例:{'title': 'TestDome2用例2', 'data': 22} test_demo2_01_02 (test_demo.TestDome2)执行——>【通过】 执行用例:{'title': 'TestDome1用例3', 'data': 33}执行用例:{'title': 'TestDome2用例3', 'data': 33}执行用例:{'title': 'Testmusen1用例3', 'data': 33} test_demo1_01_03 (test_demo.TestDome1)执行——>【通过】 test_demo2_01_03 (test_demo.TestDome2)执行——>【通过】 test_musen1_01_03 (test_musen.TestMusen1)执行——>【通过】 执行用例:{'title': 'Testmusen2用例1', 'data': 11} test_musen2_01_01 (test_musen.TestMusen2)执行——>【通过】 执行用例:{'title': 'Testmusen2用例2', 'data': 22} test_musen2_01_02 (test_musen.TestMusen2)执行——>【通过】 执行用例:{'title': 'Testmusen2用例3', 'data': 33} test_musen2_01_03 (test_musen.TestMusen2)执行——>【通过】 所有用例执行完毕,正在生成测试报告中...... 测试报告已经生成,报告路径为:./report.html
下面我们就自己手写实现一个多线程并发执行测试用例:
import unittest suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase') unittest.TextTestRunner().run(suite)
我们想要实现多线程去运行测试用例,那我们肯定要多个套件去跑,那现在只有一个咋办?我们得去拆分套件,然后再去执行。
我们先遍历测试套件看看打印出来的是啥:
很明显,第一次遍历测试套件 是以模块级别遍历。
import unittest suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase') for item in suite: print(item) # 打印结果, , ]>, , , ]>]> , , ]>, , , ]>]>
第二次遍历,分到类级别:
我们将测试类都添加到一个空列表里面,那里面就是以测试类为单元的测试套件。
import unittest suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase') # 1、拆分收集到的测试套件 suite_list = [] # 遍历测试套件,遍历出来的测试模块级别的套件 for item in suite: # 遍历出来的测试模块级别的套件,遍历出来的是测试类级别的套件 for sui_cls in item: # 将所有的测试类级别的套件,加入到suite_list列表中 suite_list.append(sui_cls)
完整代码:
import unittest from concurrent.futures.thread import ThreadPoolExecutor suite = unittest.defaultTestLoader.discover(r'C:projectpy09Classpy09_12daytestcase') # 1、拆分收集到的测试套件 suite_list = [] # 遍历测试套件,遍历出来的测试模块级别的套件 for item in suite: # 遍历出来的测试模块级别的套件,遍历出来的是测试类级别的套件 for sui_cls in item: # 将所有的测试类级别的套件,加入到suite_list列表中 suite_list.append(sui_cls) def execute(suite: unittest.TestSuite): # 创建一个保存测试结果的对象 result = unittest.TestResult() # 执行测试套件 res = suite.run(result) # 获取套件执行的结果 res = dict( fail=len(res.failures), error=len(res.errors), skip=len(res.skipped), all_test=res.testsRun ) return res # 2、创建线程池去执行测试用例 with ThreadPoolExecutor(max_workers=4) as pool: # 提交测试执行任务到进程池 result = pool.map(execute, suite_list) res = {'fail': 0, 'error': 0, 'skip': 0, 'all_test': 0} # 汇总各个任务执行的结果 for item in list(result): res['fail'] += item['fail'] res['error'] += item['error'] res['skip'] += item['skip'] res['all_test'] += item['all_test'] print(res)
我们封装一下:
import unittest from concurrent.futures.process import ProcessPoolExecutor class TestRunner: def __init__(self): self.suites = [] def classification_suite(self, suite): """将测试套件以测试类进行拆分,加入列表中""" for item in suite: if isinstance(item, unittest.TestCase): self.suites.append(suite) break else: self.classification_suite(item) def execute(self, suite): """执行用例的方法""" result = unittest.TestResult() result = suite.run(result) # 从TestResult提取执行结果,并返回 res = dict( fail=len(result.failures), error=len(result.errors), skip=len(result.skipped), all_test=result.testsRun, ) return res def run(self, suite, thread_count=1): # 分割套件 self.classification_suite(suite) # 创建进程池 with ProcessPoolExecutor(thread_count) as ts: # 提交测试任务到进程池 res = ts.map(self.execute, self.suites) test_result = {'fail': 0, 'error': 0, 'skip': 0, 'all_test': 0} # 遍历各个任务中的结果,进程整合 for item in res: test_result['fail'] += item['fail'] test_result['error'] += item['error'] test_result['skip'] += item['skip'] test_result['all_test'] += item['all_test'] # 返回测试结果 return test_result if __name__ == '__main__': suite = unittest.defaultTestLoader.discover(r'C:projectpy09Classpy09_12daytestcase') res = TestRunner().run(suite,thread_count=4) print(res)