栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Python

(十五)python之并发编程(进程)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(十五)python之并发编程(进程)

一、进程
  • 程序:例如pycharm、 QQ都是程序
  • 进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元
  • 不仅可以通过线程完成多任务,进程也是可以的。

我们打开任务管理器,就能看到我们这台电脑上打开了哪些进程。 

 

进程是操作系统分配资源的基本单位,每运行一个程序,操作系统这个时候就会启动一个进程,把你这个程序运行所要的所有资源都放在这个进程里面。那么这个进程在执行的时候,他也有这个并发和并行的状态,和线程是一样的。

如果你的电脑是个八核的CPU,那么这个时候你启动8个程序,每个程序都会有一个对应的CPU来提供进程相应的支持,那这个时候就是处于一个并行的状态,那如果说你的cpu1个,打开了多个程序,那这个时候进程也是处于并发执行的状态,这是由于操作系统的任务调度算法他会去调度先执行哪个进程,执行完了再去执行哪一个,这个是操作系统去搞得,跟进程没关系。

进程在执行的时候分为三个状态:

  •  工作中,任务数往往大于CPU的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态
  • 就绪状态:运行的条件都已经满足了,正在等cpu执行
  • 执行状态:cpu正在执行其功能
  • 等待状态:等待某些条件满足,例如一个程序sleep了,此时就处于等待状态

二、 进程、线程对比

功能:

  • 进程,能够完成多任务,比如  在一台电脑上能够同时运行多个软件
  • 线程,能够完成多任务,比如 一个QQ中的多个聊天窗口

定义的不同 

  • 进程是系统进行资源分配单位
  • 线程是进程的一个实体,是CPU调度和任务分派的基本单位,它是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

三、进程的创建 (1)进程创建

我们试着通过设置线程的方式去设置进程试试看: 

from multiprocessing import Process

a = 0


def work1():
    global a
    for i in range(100000):
        a += 1
    print("work1------a", a)


def work2():
    global a
    for i in range(100000):
        a += 1
    print("work2------a", a)


p = Process(target=work1)
p.start()

p2 = Process(target=work2)
p2.start()

p.join()
p2.join()

print('主进程a', a)

 我们执行以下发现报错了,这是为什么?

 原因:因为进程之间的资源是独立的。

进程每次执行之前,都会把他要执行的代码和数据都会备一份,两个进程都会各备份一份,备份代码的话就会把我们写的代码都加载一遍(类似于我们平常导入模块一样),然后代码里我们又写了创建进程的代码,这就会出现循环加载的情况。

这种情况我们怎么处理?

答:放在 if __name__ == '__main__'下面,这样子就能执行成功了。(windows绝对会有问题,mac有可能会有问题,还是因为操作系统执行多进程不同的原因。)

(2)守护进程 daemon

守护进程是什么意思叻?答:被设置为守护进程的p进程,他会去守护那个主进程。就是如果说主进程执行结束之后,那们这个守护进程会自动关闭。

我们来个demo,首先把p.join,p2.join 这两行代码注释掉,然后在p进程里参数新增一个deemon=True,

from multiprocessing import Process

a = 0


def work1():
    global a
    for i in range(100000):
        a += 1
    print("work1------a", a)


def work2():
    global a
    for i in range(100000):
        a += 1
    print("work2------a", a)


if __name__ == '__main__':
    p = Process(target=work1, daemon=True)
    p.start()

    p2 = Process(target=work2)
    p2.start()

    # p.join()
    # p2.join()

    print('主进程a', a)


# 打印结果
主进程a 0
work2------a 100000

其他方法就跟线程一个样子,就不做过多赘述啦

(3)获取进程 pid

        os.getpid()          获取子进程pid

        os.getppid()         获取父进程id

from multiprocessing import Process
import os
a = 0


def work1():
    global a
    for i in range(100000):
        a += 1
    print("子进程号{}--work2------a:", os.getpid(), a)
    print('父进程id:',os.getppid())


def work2():
    global a
    for i in range(100000):
        a += 1
    print("子进程号{}--work2------a:", os.getpid(),a)
    print('父进程id:', os.getppid())


def mian():
    # 参数,daemon=True,设置进程是否作为守护进程,如果是守护进程,那么该子进程会同主进程一起被关闭
    # p = Process(target=work1,daemon=True)
    p = Process(target=work1)
    p.start()
    p2 = Process(target=work2)
    p2.start()

    # p.join()
    # p2.join()


if __name__ == '__main__':
    # 注意点:windows下  程序中如果有使用多进程,程序的入口一定要写在 if __name__ == '__main__'里面:
    print('主进程a:',os.getpid(), a)
    mian()
(4) 多进程数据通信的问题

我们看下下面的代码,发现执行结果,每一个地址都请求了4次,为什么线程不会这么去执行而进程会?因为进程他自己都有独一份属于自己的url_list数据(上面讲到过)。

from multiprocessing import Process
import time

url_list = [f"https://www.baidu.com-{i}" for i in range(100)]


def work():
    while url_list:
        url = url_list.pop()
        print(url)
        time.sleep(0.5)


def main():
    start_time = time.time()
    ts = []
    for i in range(4):
        t1 = Process(target=work)
        t1.start()
        ts.append(t1)

    for t in ts:
        t.join()

    end_time = time.time()

    print('执行时间为:', end_time - start_time)


if __name__ == '__main__':
    main()



# 打印结果
https://www.baidu.com-99
https://www.baidu.com-99
https://www.baidu.com-99
https://www.baidu.com-99
https://www.baidu.com-98
https://www.baidu.com-98
https://www.baidu.com-98
https://www.baidu.com-98
https://www.baidu.com-97
https://www.baidu.com-97
https://www.baidu.com-97
https://www.baidu.com-97
https://www.baidu.com-96
https://www.baidu.com-96
https://www.baidu.com-96
https://www.baidu.com-96
...
...
...
...

那么我们的需求就来了:我们怎么让4个线程去跑这个list,并且不会重复跑l列表里的值(也就是说,怎么让这4个进程共享这一份数据)。

有的小伙伴说可以用队列,我们刚刚学了队列 Queue,但是我们有提到过,这个Queue只能在线程里面用,通常是用来在线程和线程之间保证这个数据的安全性的。

那么进程和进程之间这个队列是用不了的,那么用啥叻?用进程模块它里面自己封装的一个队列。

from multiprocessing import Process, Queue

我们看下进程里面封装的Queue这个类,实际上也是导入的原先那个Queue,在原先的Queue这个类上进行修改,达到了可以在进程和进程之间通讯这个功能。

下面我们上代码:

"""
进程之间的数据共享(进程通信)
1、在主进程中通过multiprocessing.Queue 创建一个队列
2、在创建子进程的时候,将队列当成参数传入各个子进程
3、在子进程中使用队列中的数据

queue.Queue和multiprocessing.Queue 有什么区别 ?
queue.Queue: 只能在同一个进程中的多个线程之间使用
multiprocessing.Queue:可以再多个进程之间跨进程输出数据(通信)


"""
from multiprocessing import Process, Queue
import time


def work(urls):
    while not urls.empty():
        url = urls.get()
        print(url)
        time.sleep(0.5)


def main():
    urls = Queue()
    for i in range(100):
        urls.put(f"https://www.baidu.com-{i}")
    # 创建4个线程
    start_time = time.time()
    ts = []
    for i in range(4):
        t1 = Process(target=work, args=(urls,))
        t1.start()
        ts.append(t1)

    for t in ts:
        t.join()

    end_time = time.time()
    print('执行时间为:', end_time - start_time)


if __name__ == '__main__':
    main()

(5)进程池(扩展)

我们来个demo,给大家示范一下。

import os
import random
import time
from multiprocessing import Pool


def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))


if __name__ == '__main__':
    po = Pool(3)  # 定义一个进程池,最大进程数3
    for i in range(0, 10):
        # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker, (i,))
    #
    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")

(6)线程池、进程池的使用
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def work(name):
    # for i in range(10):
    time.sleep(0.5)
    print("------{}-----{}".format(name, i))


# with ThreadPoolExecutor(max_workers=4) as tp:
#     # submit 往线程中提交任务
#     for i in range(100):
#         tp.submit(work, 'http://www.baidu.com-{}'.format(i))
#     # shutdown:等待线程池中所有的任务执行完毕
#     tp.shutdown(wait=True)
#     print("----end----")

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as tp:
        # submit 往线程中提交任务
        for i in range(100):
            tp.submit(work, 'http://www.baidu.com-{}'.format(i))
        # shutdown:等待线程池中所有的任务执行完毕
        tp.shutdown(wait=True)
        print("----end----")

(7)进程池参数传递和获取执行结果 ①第一个方法

我们看打印结果:

ts.result() 是获取这个进程池执行完毕过后返回的结果,所以说他会一直等待这个ts执行完毕之后才会去执行ts2,所以这个时候我们得把ts ts2这2个提交任务的代码放在前面。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time


def work(name):
    for i in range(4):
        time.sleep(1)
        print('------{}-----{}'.format(name, i))
    return '{}进程返回的结果'.format(name)


def main():
    with ProcessPoolExecutor(max_workers=3) as pool:
        ts = pool.submit(work, 'musen')
        print(ts.result())
        ts2 = pool.submit(work, 'musen2')
        print(ts2.result())


if __name__ == '__main__':
    main()




# 打印结果
------musen-----0
------musen-----1
------musen-----2
------musen-----3
musen进程返回的结果
------musen-----0
------musen-----1
------musen-----2
------musen-----3
musen2进程返回的结果

修改后的代码:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time


def work(name):
    for i in range(4):
        time.sleep(1)
        print('------{}-----{}'.format(name, i))
    return '{}进程返回的结果'.format(name)


def main():
    with ProcessPoolExecutor(max_workers=3) as pool:
        ts = pool.submit(work, 'musen')
        ts2 = pool.submit(work, 'musen2')
        print(ts.result())
        print(ts2.result())


if __name__ == '__main__':
    main()



# 打印结果
------musen-----0
------musen2-----0
------musen-----1
------musen2-----1
------musen-----2
------musen2-----2
------musen-----3
musen进程返回的结果
------musen2-----3
musen2进程返回的结果
②第二个方法(这个方法可以不用记)

或者这样子写,官方文档提供一个方法:

import concurrent
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time


def work(name):
    for i in range(4):
        time.sleep(1)
        print('------{}-----{}'.format(name, i))
    return '{}进程返回的结果'.format(name)


def main():
    with ProcessPoolExecutor(max_workers=3) as pool:
        ts = pool.submit(work, 'musen')
        ts2 = pool.submit(work, 'musen2')
        for item in concurrent.futures.as_completed([ts, ts2]):
            print(item.result())


if __name__ == '__main__':
    main()



# 打印结果
------musen-----0
------musen2-----0
------musen-----1
------musen2-----1
------musen-----2
------musen2-----2
------musen-----3
musen进程返回的结果
------musen2-----3
musen2进程返回的结果

③第三个方法map
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time


def work(name):
    for i in range(4):
        time.sleep(1)
        print('------{}-----{}'.format(name, i))
    return '{}进程返回的结果'.format(name)


def main():
    with ProcessPoolExecutor(max_workers=3) as pool:
        # 通过map去提交任务
        res = pool.map(work, ['musen001', 'yuze001', 'xiaojian001'])
        # 获取多个任务执行的结果
        print(list(res))


if __name__ == '__main__':
    main()



# 打印结果
------musen001-----0
------yuze001-----0
------xiaojian001-----0
------musen001-----1
------yuze001-----1
------xiaojian001-----1
------musen001-----2
------yuze001-----2
------xiaojian001-----2
------musen001-----3
------xiaojian001-----3
------yuze001-----3
['musen001进程返回的结果', 'yuze001进程返回的结果', 'xiaojian001进程返回的结果']

(8)进程池并发执行测试用例

我们来个demo,testpy文件里展示:

import time
import unittest
from unittestreport import ddt, list_data


@ddt
class TestDome1(unittest.TestCase):
    data = [
        {'title': "TestDome1用例1", "data": 11},
        {'title': "TestDome1用例2", "data": 22},
        {'title': "TestDome1用例3", "data": 33}]

    @list_data(data)
    def test_demo1_01(self, item):
        time.sleep(0.5)
        print("执行用例:{}".format(item))


@ddt
class TestDome2(unittest.TestCase):
    data = [
        {'title': "TestDome2用例1", "data": 11},
        {'title': "TestDome2用例2", "data": 22},
        {'title': "TestDome2用例3", "data": 33}]

    @list_data(data)
    def test_demo2_01(self, item):
        time.sleep(0.5)
        print("执行用例:{}".format(item))

我们执行测试用例,设置的thread_count=1,线程数为1:

import unittest
from unittestreport import TestRunner

suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase')
TestRunner(suite).run(thread_count=1)



# 打印结果
执行用例:{'title': 'TestDome1用例1', 'data': 11}
test_demo1_01_01 (test_demo.TestDome1)执行——>【通过】
执行用例:{'title': 'TestDome1用例2', 'data': 22}
test_demo1_01_02 (test_demo.TestDome1)执行——>【通过】
执行用例:{'title': 'TestDome1用例3', 'data': 33}
test_demo1_01_03 (test_demo.TestDome1)执行——>【通过】
执行用例:{'title': 'TestDome2用例1', 'data': 11}
test_demo2_01_01 (test_demo.TestDome2)执行——>【通过】
执行用例:{'title': 'TestDome2用例2', 'data': 22}
test_demo2_01_02 (test_demo.TestDome2)执行——>【通过】
执行用例:{'title': 'TestDome2用例3', 'data': 33}
test_demo2_01_03 (test_demo.TestDome2)执行——>【通过】
执行用例:{'title': 'Testmusen1用例1', 'data': 11}
test_musen1_01_01 (test_musen.TestMusen1)执行——>【通过】
执行用例:{'title': 'Testmusen1用例2', 'data': 22}
test_musen1_01_02 (test_musen.TestMusen1)执行——>【通过】
执行用例:{'title': 'Testmusen1用例3', 'data': 33}
test_musen1_01_03 (test_musen.TestMusen1)执行——>【通过】
执行用例:{'title': 'Testmusen2用例1', 'data': 11}
test_musen2_01_01 (test_musen.TestMusen2)执行——>【通过】
执行用例:{'title': 'Testmusen2用例2', 'data': 22}
test_musen2_01_02 (test_musen.TestMusen2)执行——>【通过】
执行用例:{'title': 'Testmusen2用例3', 'data': 33}
test_musen2_01_03 (test_musen.TestMusen2)执行——>【通过】

我们把线程数设置为3试试看,很明显运行速度变快了:

import unittest
from unittestreport import TestRunner

suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase')
TestRunner(suite).run(thread_count=3)




# 打印结果
执行用例:{'title': 'TestDome1用例1', 'data': 11}执行用例:{'title': 'Testmusen1用例1', 'data': 11}执行用例:{'title': 'TestDome2用例1', 'data': 11}
test_demo2_01_01 (test_demo.TestDome2)执行——>【通过】

test_musen1_01_01 (test_musen.TestMusen1)执行——>【通过】

test_demo1_01_01 (test_demo.TestDome1)执行——>【通过】
执行用例:{'title': 'TestDome1用例2', 'data': 22}
test_demo1_01_02 (test_demo.TestDome1)执行——>【通过】
执行用例:{'title': 'Testmusen1用例2', 'data': 22}
test_musen1_01_02 (test_musen.TestMusen1)执行——>【通过】
执行用例:{'title': 'TestDome2用例2', 'data': 22}
test_demo2_01_02 (test_demo.TestDome2)执行——>【通过】
执行用例:{'title': 'TestDome1用例3', 'data': 33}执行用例:{'title': 'TestDome2用例3', 'data': 33}执行用例:{'title': 'Testmusen1用例3', 'data': 33}
test_demo1_01_03 (test_demo.TestDome1)执行——>【通过】

test_demo2_01_03 (test_demo.TestDome2)执行——>【通过】

test_musen1_01_03 (test_musen.TestMusen1)执行——>【通过】
执行用例:{'title': 'Testmusen2用例1', 'data': 11}
test_musen2_01_01 (test_musen.TestMusen2)执行——>【通过】
执行用例:{'title': 'Testmusen2用例2', 'data': 22}
test_musen2_01_02 (test_musen.TestMusen2)执行——>【通过】
执行用例:{'title': 'Testmusen2用例3', 'data': 33}
test_musen2_01_03 (test_musen.TestMusen2)执行——>【通过】
所有用例执行完毕,正在生成测试报告中......
测试报告已经生成,报告路径为:./report.html

下面我们就自己手写实现一个多线程并发执行测试用例:

import unittest

suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase')
unittest.TextTestRunner().run(suite)

我们想要实现多线程去运行测试用例,那我们肯定要多个套件去跑,那现在只有一个咋办?我们得去拆分套件,然后再去执行。

我们先遍历测试套件看看打印出来的是啥:

很明显,第一次遍历测试套件 是以模块级别遍历。

import unittest

suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase')
for item in suite:
    print(item)




# 打印结果
, , ]>, , , ]>]>



, , ]>, , , ]>]>

第二次遍历,分到类级别:

 我们将测试类都添加到一个空列表里面,那里面就是以测试类为单元的测试套件。

import unittest

suite = unittest.defaultTestLoader.discover(r'/Users/wangqiang/cekai/py09_12day/testcase')

#  1、拆分收集到的测试套件
suite_list = []
# 遍历测试套件,遍历出来的测试模块级别的套件
for item in suite:
    # 遍历出来的测试模块级别的套件,遍历出来的是测试类级别的套件
    for sui_cls in item:
        # 将所有的测试类级别的套件,加入到suite_list列表中
        suite_list.append(sui_cls)

完整代码:

import unittest
from concurrent.futures.thread import ThreadPoolExecutor

suite = unittest.defaultTestLoader.discover(r'C:projectpy09Classpy09_12daytestcase')
#  1、拆分收集到的测试套件
suite_list = []
# 遍历测试套件,遍历出来的测试模块级别的套件
for item in suite:
    # 遍历出来的测试模块级别的套件,遍历出来的是测试类级别的套件
    for sui_cls in item:
        # 将所有的测试类级别的套件,加入到suite_list列表中
        suite_list.append(sui_cls)


def execute(suite: unittest.TestSuite):
    # 创建一个保存测试结果的对象
    result = unittest.TestResult()
    # 执行测试套件
    res = suite.run(result)
    # 获取套件执行的结果
    res = dict(
        fail=len(res.failures),
        error=len(res.errors),
        skip=len(res.skipped),
        all_test=res.testsRun
    )
    return res


# 2、创建线程池去执行测试用例
with ThreadPoolExecutor(max_workers=4) as pool:
    # 提交测试执行任务到进程池
    result = pool.map(execute, suite_list)
    res = {'fail': 0, 'error': 0, 'skip': 0, 'all_test': 0}
    # 汇总各个任务执行的结果
    for item in list(result):
        res['fail'] += item['fail']
        res['error'] += item['error']
        res['skip'] += item['skip']
        res['all_test'] += item['all_test']
    print(res)

我们封装一下:

import unittest
from concurrent.futures.process import ProcessPoolExecutor


class TestRunner:
    def __init__(self):
        self.suites = []

    def classification_suite(self, suite):
        """将测试套件以测试类进行拆分,加入列表中"""
        for item in suite:
            if isinstance(item, unittest.TestCase):
                self.suites.append(suite)
                break
            else:
                self.classification_suite(item)

    def execute(self, suite):
        """执行用例的方法"""
        result = unittest.TestResult()
        result = suite.run(result)
        # 从TestResult提取执行结果,并返回
        res = dict(
            fail=len(result.failures),
            error=len(result.errors),
            skip=len(result.skipped),
            all_test=result.testsRun,
        )
        return res

    def run(self, suite, thread_count=1):
        # 分割套件
        self.classification_suite(suite)
        # 创建进程池
        with ProcessPoolExecutor(thread_count) as ts:
            # 提交测试任务到进程池
            res = ts.map(self.execute, self.suites)
            test_result = {'fail': 0, 'error': 0, 'skip': 0, 'all_test': 0}
            # 遍历各个任务中的结果,进程整合
            for item in res:
                test_result['fail'] += item['fail']
                test_result['error'] += item['error']
                test_result['skip'] += item['skip']
                test_result['all_test'] += item['all_test']
            # 返回测试结果
            return test_result


if __name__ == '__main__':
    suite = unittest.defaultTestLoader.discover(r'C:projectpy09Classpy09_12daytestcase')
    res = TestRunner().run(suite,thread_count=4)
    print(res)

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1037310.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号