栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Python

进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

进程的两种创建方式,join方法,进程间的数据隔离,队列,进程间的通信IPC机制,生产者消费者模型,守护进程,僵尸进程,孤儿进程,互斥锁

一:创建进程的两种方式: 方式1
from multiprocessing import Process
import time


def task(name):
    print(f'{name}开始执行')
    time.sleep(1)
    print(f'{name}执行结束')


if __name__ == '__main__':
    p = Process(target=task, args=('ming',))
    p.start()  # 告诉操作系统帮你创建一个进程,异步,相当于创建了一个副本来执行程序
    print('主进程')
方式2
from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f'{self.name}开始执行')
        time.sleep(1)
        print(f'{self.name}执行结束')


if __name__ == '__main__':
    p = MyProcess('ming')
    p.start()
    print('主')
二:join方法

功能:主进程等子进程结束之后才能执行

1代码实现join方法
from multiprocessing import Process
import time


def task(name):
    print(f'{name}开始执行')
    time.sleep(1)
    print(f'{name}执行结束')


if __name__ == '__main__':
    p = Process(target=task, args=('ming',))
    p.start()
    p.join()
    print('主进程')
    
"""
ming开始执行
ming执行结束
主进程
"""
from multiprocessing import Process
import time


def dask(n):
    time.sleep(n)

if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=dask, args=(1,))
    p2 = Process(target=dask, args=(2,))
    p3 = Process(target=dask, args=(3,))

    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    end_time = time.time() - start_time
    print(end_time)  # 3.2495625019073486
    print('主')
from multiprocessing import Process
import time


def dask(n):
    time.sleep(n)

if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=dask, args=(1,))
    p2 = Process(target=dask, args=(2,))
    p3 = Process(target=dask, args=(3,))

    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()
    end_time = time.time() - start_time
    print(end_time)  # 6.780239820480347
    print('主')

三:进程间数据是隔离的 1.代码验证
from multiprocessing import Process
import time

hobby = 'sing'
def change():
    global hobby
    hobby = 'read'

if __name__ == '__main__':
    p = Process(target=change)
    p.start()
    print(hobby)  # sing
四:队列 1.特点:先进先出 2.队列方法
  • 1.put() 向队列中传入数据
  • 2.get() 从队列中取出数据
  • 3.full() 判断队列是否已满
  • 4.empty() 判断队列是否为空
  • 5.get_nowait() 从队列中获取数据,不等待,没有就直接报错
3.注意
  • 上述full(),empty(),get_nowait()方法在多进程中会失效
from multiprocessing import Queue

q = Queue(3)
print(q.empty())  # True  判断队列是否为空
q.put(1)
q.put(2)
q.put(3)
print(q.full())  # True  判断队列是否为满

print(q.get())  # 1
print(q.get())  # 2
print(q.get())  # 3
q.get_nowait()  # 获取队列中的数据,不等待,没有直接报错
五:进程间相互通信(IPC机制) 1.什么是IPC机制?

主进程与子进程通信 子进程与子进程通信

from multiprocessing import Process,Queue


def run1(q):
    q.put('子进程run1的数据')


def run2(q):
    data = q.get()
    print(f'子进程run2从队列中得到数据是:{data}')


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=run1, args=(q,))
    p2 = Process(target=run2, args=(q,))
    p1.start()
    p2.start()
    
# 执行结果: 子进程run2从队列中得到数据是:子进程run1的数据

总结:通过队列可以实现不同队列间的通信

六:生产者消费者模型
  • 生产者:产生数据
  • 消费者:处理数据
1.代码实现生产者消费者功能
"""
生产者:生产/制造东西的
消费者:消费/处理东西的

该模型除了上面两个之外还需要一个媒介
生产者和消费者之间不是直接做交互的 而是借助于媒介做交互的
    生产者(做包子的) + 消息队列(蒸笼)+ 消费者(吃包子的)
"""

from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(1, 6):
        data = f'{name}生产的{food}{i}'
        time.sleep(random.randint(1, 2))
        print(data)
        q.put(data)


def customer(name, q):
    while True:
        food = q.get()  # 没有数据会卡住
        # 判断当前是否结束标识
        # if food is None: break
        time.sleep(random.randint(1, 2))
        print(f'{name}吃了{food}')
        q.task_done()  # 告诉队列你已经从里面取了一个数据并且处理完毕了


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('机器猫', '包子', q))
    p2 = Process(target=producer, args=('大雄', '饺子', q))
    c1 = Process(target=customer, args=('达夫', q))
    c2 = Process(target=customer, args=('胖虎', q))
    p1.start()
    p2.start()
    # 将消费者设置成守护进程
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()

    p1.join()
    p2.join()

    # q.put(None)
    # q.put(None)
    q.join()  # 等待队列中所有的数据被取完再再往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
    每当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候 才往后运行
    """
    # 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
七:进程相关方法
  • 1.current_process().pid 查看当前进程的进程号
  • 2.os.getpid() 查看当前进程的进程号
  • 3.os.getppid() 查看当前进程父进程的进程号
  • 4.terminate() 销毁进程
  • 5.is_alive() 判断当前进程是否存活

代码实现

from multiprocessing import Process, current_process
import os

def dask():
    print(f'子进程:{current_process().pid}')  # 19664
    print(f'子进程:{os.getpid()}')  # 19664
    print(f'子进程的父进程:{os.getppid()}')  # 4816


if __name__ == '__main__':
    p = Process(target=dask)
    p.start()
    p.terminate()  # 销毁进程
    p.is_alive()  # 判断进程是否存活
    print(f'主进程{os.getpid()}')  # 4816
八:守护进程 1.什么是守护进程?

守护对象执行结束自己立即也结束的进程

代码实现

from multiprocessing import Process
import time

def task():
    print('子进程task正在执行')
    time.sleep(3)
    print('子进程task执行结束')


if __name__ == '__main__':
    p = Process(target=task)
    p.daemon = True
    p.start()
    time.sleep(1)
    print('父进程执行结束')
九:僵尸进程和孤儿进程 1.僵尸进程

进程已经运行结束 但是相关的资源并没有完全清空 需要父进程参与回收

2.孤儿进程

父进程意外死亡 子进程正常运行 该子进程就称之为孤儿进程 孤儿进程也不是没有人管 操作系统会自动分配福利院接收

十:互斥锁
import random
from multiprocessing import Process, Lock
import json
import time


# 1.查票
def search(i):
    with open('data', 'r', encoding='utf8') as f:
        dic = json.load(f)
    print('用户%s 要查的余票是%s' % (i, dic.get('ticket_num')))
    # 字典取值尽量用get()

# 2.买票
def buy(i):
    # 先查票
    with open('data', 'r', encoding='utf-8') as f:
        dic = json.load(f)
    # 模拟延迟
    time.sleep(random.randint(1, 3))
    # 判断道歉是否有余票
    if dic.get('ticket_num') > 0:
        # 修改数据库买票
        dic["ticket_num"] -= 1
        # 写入数据库
        with open('data', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print('用户%s 买票成功' % i)
    else:
        print('用户%s 买票失败' % i)


# 整合上面的两个函数
def run(i, mutex):
    search(i)
    # 给买票环节加锁
    # 抢锁
    mutex.acquire()
    buy(i)
    # 释放锁
    mutex.release()


if __name__ == '__main__':
    # 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
    mutex = Lock()
    for i in range(10):
        p = Process(target=run, args=(i, mutex))
        p.start()
结论:

1.多个进程操作同一份数据的时候,会出现数据错乱的问题 针对上述问题,解决方法就是加锁处理

2.互斥锁:将并发变成串行,牺牲效率但是保证了数据的安全性

3.注意:

  • 1.锁不要轻易使用,会出现死锁现象(我们写代码一般不会用到,都是内部封装好的)
  • 2.锁只在处理数据的部分加 来保证数据安全(只在争抢数据的环节加锁处理)
作业:

1.尝试将TCP服务端制作成并发效果
客户端服务端全部设置成自动发消息自动回消息
eg: 客户端发hello 服务端直接转大写回HELLO
服务端:

import socket
from multiprocessing import Process


server = socket.socket()
server.bind(('127.0.0.1', 8090))
server.listen(5)
sock, addr = server.accept()


# 将服务的代码单独封装成一个函数
def task(sock):
    while True:
        try:
            data = sock.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf8'))
            sock.send(data.upper())
        except ConnectionError as e:
            print(e)
            break
    sock.close()


if __name__ == '__main__':
    p = Process(target=task, args=(sock,))
    p.start()

客户端

import socket

client = socket.socket()
client.connect(('127.0.0.1', 8090))
while True:
    client.send('hello '.encode('utf8'))
    data = client.recv(1024)
    print(data.decode('utf8'))
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1037227.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号