from multiprocessing import Process import time def task(name): print(f'{name}开始执行') time.sleep(1) print(f'{name}执行结束') if __name__ == '__main__': p = Process(target=task, args=('ming',)) p.start() # 告诉操作系统帮你创建一个进程,异步,相当于创建了一个副本来执行程序 print('主进程')方式2
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print(f'{self.name}开始执行') time.sleep(1) print(f'{self.name}执行结束') if __name__ == '__main__': p = MyProcess('ming') p.start() print('主')二:join方法
功能:主进程等子进程结束之后才能执行
1代码实现join方法from multiprocessing import Process import time def task(name): print(f'{name}开始执行') time.sleep(1) print(f'{name}执行结束') if __name__ == '__main__': p = Process(target=task, args=('ming',)) p.start() p.join() print('主进程') """ ming开始执行 ming执行结束 主进程 """
from multiprocessing import Process import time def dask(n): time.sleep(n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=dask, args=(1,)) p2 = Process(target=dask, args=(2,)) p3 = Process(target=dask, args=(3,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() end_time = time.time() - start_time print(end_time) # 3.2495625019073486 print('主')
from multiprocessing import Process import time def dask(n): time.sleep(n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=dask, args=(1,)) p2 = Process(target=dask, args=(2,)) p3 = Process(target=dask, args=(3,)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() end_time = time.time() - start_time print(end_time) # 6.780239820480347 print('主')三:进程间数据是隔离的 1.代码验证
from multiprocessing import Process import time hobby = 'sing' def change(): global hobby hobby = 'read' if __name__ == '__main__': p = Process(target=change) p.start() print(hobby) # sing四:队列 1.特点:先进先出 2.队列方法
- 1.put() 向队列中传入数据
- 2.get() 从队列中取出数据
- 3.full() 判断队列是否已满
- 4.empty() 判断队列是否为空
- 5.get_nowait() 从队列中获取数据,不等待,没有就直接报错
- 上述full(),empty(),get_nowait()方法在多进程中会失效
from multiprocessing import Queue q = Queue(3) print(q.empty()) # True 判断队列是否为空 q.put(1) q.put(2) q.put(3) print(q.full()) # True 判断队列是否为满 print(q.get()) # 1 print(q.get()) # 2 print(q.get()) # 3 q.get_nowait() # 获取队列中的数据,不等待,没有直接报错五:进程间相互通信(IPC机制) 1.什么是IPC机制?
主进程与子进程通信 子进程与子进程通信
from multiprocessing import Process,Queue def run1(q): q.put('子进程run1的数据') def run2(q): data = q.get() print(f'子进程run2从队列中得到数据是:{data}') if __name__ == '__main__': q = Queue() p1 = Process(target=run1, args=(q,)) p2 = Process(target=run2, args=(q,)) p1.start() p2.start() # 执行结果: 子进程run2从队列中得到数据是:子进程run1的数据
总结:通过队列可以实现不同队列间的通信
六:生产者消费者模型- 生产者:产生数据
- 消费者:处理数据
""" 生产者:生产/制造东西的 消费者:消费/处理东西的 该模型除了上面两个之外还需要一个媒介 生产者和消费者之间不是直接做交互的 而是借助于媒介做交互的 生产者(做包子的) + 消息队列(蒸笼)+ 消费者(吃包子的) """ from multiprocessing import Process, Queue, JoinableQueue import time import random def producer(name, food, q): for i in range(1, 6): data = f'{name}生产的{food}{i}' time.sleep(random.randint(1, 2)) print(data) q.put(data) def customer(name, q): while True: food = q.get() # 没有数据会卡住 # 判断当前是否结束标识 # if food is None: break time.sleep(random.randint(1, 2)) print(f'{name}吃了{food}') q.task_done() # 告诉队列你已经从里面取了一个数据并且处理完毕了 if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producer, args=('机器猫', '包子', q)) p2 = Process(target=producer, args=('大雄', '饺子', q)) c1 = Process(target=customer, args=('达夫', q)) c2 = Process(target=customer, args=('胖虎', q)) p1.start() p2.start() # 将消费者设置成守护进程 c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join() # q.put(None) # q.put(None) q.join() # 等待队列中所有的数据被取完再再往下执行代码 """ JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1 每当你调用task_done的时候 计数器-1 q.join() 当计数器为0的时候 才往后运行 """ # 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了七:进程相关方法
- 1.current_process().pid 查看当前进程的进程号
- 2.os.getpid() 查看当前进程的进程号
- 3.os.getppid() 查看当前进程父进程的进程号
- 4.terminate() 销毁进程
- 5.is_alive() 判断当前进程是否存活
代码实现
from multiprocessing import Process, current_process import os def dask(): print(f'子进程:{current_process().pid}') # 19664 print(f'子进程:{os.getpid()}') # 19664 print(f'子进程的父进程:{os.getppid()}') # 4816 if __name__ == '__main__': p = Process(target=dask) p.start() p.terminate() # 销毁进程 p.is_alive() # 判断进程是否存活 print(f'主进程{os.getpid()}') # 4816八:守护进程 1.什么是守护进程?
守护对象执行结束自己立即也结束的进程
代码实现
from multiprocessing import Process import time def task(): print('子进程task正在执行') time.sleep(3) print('子进程task执行结束') if __name__ == '__main__': p = Process(target=task) p.daemon = True p.start() time.sleep(1) print('父进程执行结束')九:僵尸进程和孤儿进程 1.僵尸进程
进程已经运行结束 但是相关的资源并没有完全清空 需要父进程参与回收
2.孤儿进程父进程意外死亡 子进程正常运行 该子进程就称之为孤儿进程 孤儿进程也不是没有人管 操作系统会自动分配福利院接收
十:互斥锁import random from multiprocessing import Process, Lock import json import time # 1.查票 def search(i): with open('data', 'r', encoding='utf8') as f: dic = json.load(f) print('用户%s 要查的余票是%s' % (i, dic.get('ticket_num'))) # 字典取值尽量用get() # 2.买票 def buy(i): # 先查票 with open('data', 'r', encoding='utf-8') as f: dic = json.load(f) # 模拟延迟 time.sleep(random.randint(1, 3)) # 判断道歉是否有余票 if dic.get('ticket_num') > 0: # 修改数据库买票 dic["ticket_num"] -= 1 # 写入数据库 with open('data', 'w', encoding='utf-8') as f: json.dump(dic, f) print('用户%s 买票成功' % i) else: print('用户%s 买票失败' % i) # 整合上面的两个函数 def run(i, mutex): search(i) # 给买票环节加锁 # 抢锁 mutex.acquire() buy(i) # 释放锁 mutex.release() if __name__ == '__main__': # 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票 mutex = Lock() for i in range(10): p = Process(target=run, args=(i, mutex)) p.start()结论:
1.多个进程操作同一份数据的时候,会出现数据错乱的问题 针对上述问题,解决方法就是加锁处理
2.互斥锁:将并发变成串行,牺牲效率但是保证了数据的安全性
3.注意:
- 1.锁不要轻易使用,会出现死锁现象(我们写代码一般不会用到,都是内部封装好的)
- 2.锁只在处理数据的部分加 来保证数据安全(只在争抢数据的环节加锁处理)
1.尝试将TCP服务端制作成并发效果
客户端服务端全部设置成自动发消息自动回消息
eg: 客户端发hello 服务端直接转大写回HELLO
服务端:
import socket from multiprocessing import Process server = socket.socket() server.bind(('127.0.0.1', 8090)) server.listen(5) sock, addr = server.accept() # 将服务的代码单独封装成一个函数 def task(sock): while True: try: data = sock.recv(1024) if len(data) == 0: break print(data.decode('utf8')) sock.send(data.upper()) except ConnectionError as e: print(e) break sock.close() if __name__ == '__main__': p = Process(target=task, args=(sock,)) p.start()
客户端
import socket client = socket.socket() client.connect(('127.0.0.1', 8090)) while True: client.send('hello '.encode('utf8')) data = client.recv(1024) print(data.decode('utf8'))