Python

程序实体

进程

  1. Windows
    • multiprocessing
  2. Linux
    • fork
  3. 应用场景
    • 运算率大的时候(不共享数据,计算密集型)

进程方法与全局变量的访问

代码示例:

from multiprocessing import Process  # 引入标准库函数
from time import sleep


def foo(args, kwargs):
    while True:
        sleep(args)  # 模拟阻塞状态
        print(kwargs)


def bar(args, kwargs):
    while True:
        sleep(args)
        print(kwargs)


if __name__ == '__main__':  # 因为存在模块间互相调用
    '''
    1. Process
        · 创建子进程对象
        1. target
            · 传入可调用的参数(任务)
        2. name
            · 子进程名字
        3. args
            · 给可调用的参数传入的可迭代的值
    2. start()
        · 启动子进程并执行任务,而run()只是执行了任务但没有启动子进程
    3. terminate()
        · 终止子进程
    '''
    P01 = Process(target=foo, name='任务01', args=(1, 'foo'))
    P02 = Process(target=bar, name='任务02', args=(2, 'bar'))
    P01.start()
    print(P01.name)
    P02.start()
    print(P02.name)
    '''
    任务01
    任务02
    foo
    foo
    bar
    foo
    '''

进程全局变量的访问特点

在每个子进程里面都复制一份全局变量,保证每个子进程间互不干扰(全局变量不会变,跟可变与不可变类型的数据无关)

代码示例:

from multiprocessing import Process
from time import sleep

h = 1
def foo(args):
    global h
    while h < 10:
        sleep(args)
        h += 1
        print('foo ==>',h)


def bar(args):
    global h
    while h < 10:
        sleep(args)
        h += 1
        print('bar ==>',h)


if __name__ == '__main__':
    # join():等这个子进程结束再执行主进程的任务(阻塞)
    P01 = Process(target=foo, name='任务01', args=(1,))
    P02 = Process(target=bar, name='任务02', args=(2,))
    P01.start()
    P02.start()
    P02.join()
    print('global ==>', h)
    '''
    foo ==> 2
    foo ==> 3
    bar ==> 2
    foo ==> 4
    foo ==> 5
    bar ==> 3
    global ==> 1
    '''

自定义进程

代码示例:

from multiprocessing import Process
from time import sleep


class Foo(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        a = 1
        while True:
            print('{} ==> {}\n'.format(a, self.name))
            a += 1
            sleep(1)


if __name__ == '__main__':
    # 启动子进程时会从父类Process调用run函数
    P01 = Foo('foo')
    P02 = Foo('bar')
    P01.start()
    P02.start()
    '''
    1 ==> foo

    1 ==> bar

    2 ==> foo

    2 ==> bar

    3 ==> foo

    3 ==> bar
    '''

非阻塞式进程池

代码示例:

import os
from time import time, sleep
from multiprocessing import Pool  # 进程池类
from random import random


def foo(args):
    print('执行任务:{} 进程ID:{}'.format(args, os.getpid()))  # getpid():获取进程ID
    start = time()
    sleep(random() * 2)
    end = time()
    return '完成任务:{} 用时:{} 进程ID:{}'.format(args, end - start, os.getpid())  # 返给回调函数


def bar(args):  # 接收return的数据
    myList.append(args)  # 回调函数本身不能return


if __name__ == '__main__':
    '''
    1. apply_async
        · 非阻塞函数,callback(回调函数)
    2. close()
        · 关闭进程池(不再接收新的任务)
    '''
    myList = []
    P01 = Pool(3)  # 创建拥有三个进程的进程池
    list01 = ['任务01', '任务02', '任务03', '任务04', '任务05']
    for i in list01:
        P01.apply_async(foo, args=(i,), callback=bar)

    P01.close()
    P01.join()
    print('结束任务')
    for i in myList:
        print(i)

    '''
    执行任务:任务01 进程ID:2864
    执行任务:任务02 进程ID:9220
    执行任务:任务03 进程ID:4972
    执行任务:任务04 进程ID:2864
    执行任务:任务05 进程ID:4972
    结束任务
    完成任务:任务01 用时:0.8482961654663086 进程ID:2864
    完成任务:任务03 用时:0.929196834564209 进程ID:4972
    完成任务:任务02 用时:1.0513510704040527 进程ID:9220
    完成任务:任务04 用时:0.3508336544036865 进程ID:2864
    完成任务:任务05 用时:0.909203052520752 进程ID:4972
    '''

进程池之阻塞式

代码示例:

import os
from time import time, sleep
from multiprocessing import Pool
from random import random


def foo(args):
    print('执行任务:{} 进程ID:{}'.format(args, os.getpid()))
    start = time()
    sleep(random() * 2)
    end = time()
    print('完成任务:{} 用时:{} 进程ID:{}'.format(args, end - start, os.getpid()))


if __name__ == '__main__':
    # apply:阻塞式函数(无回调函数)
    list01 = ['任务01', '任务02', '任务03', '任务04', '任务05']
    P01 = Pool(3)
    for i in list01:
        P01.apply(foo, args=(i,))

    P01.close()
    P01.join()
    '''
    执行任务:任务01 进程ID:3436
    完成任务:任务01 用时:1.5749576091766357 进程ID:3436
    执行任务:任务02 进程ID:10308
    完成任务:任务02 用时:0.6210627555847168 进程ID:10308
    执行任务:任务03 进程ID:4824
    完成任务:任务03 用时:0.8212382793426514 进程ID:4824
    执行任务:任务04 进程ID:3436
    完成任务:任务04 用时:0.5435049533843994 进程ID:3436
    执行任务:任务05 进程ID:10308
    完成任务:任务05 用时:1.8371591567993164 进程ID:10308
    '''

队列

代码示例:

from multiprocessing import Queue  # 导入队列的类

'''
1. put
    · 添加参数进队列(put函数有阻塞作用,如果队列满了则只能等待空闲出位置才可以往里面添加参数,否则就会一直是阻塞状态,主进程也会阻塞)
2. qsize
    · 获取当前队列中有多少个参数
3. put_nowait
    · 添加参数进队列且取消阻塞作用(但是当队列中的参数已满时使用会报错)
'''
Foo = Queue(3)  # 创建容纳3个参数的队列
Foo.put('foo')
Foo.put('bar')
print(Foo.qsize())
'''
2
'''

'''
1. full()
    · 判断队列中的参数是否已满
2. not_full()
    · 判断队列中的参数是否未满
3. empty()
    · 判断队列的参数是否为空
4. timeout
    · 超时的时间(单位:秒,当队列的参数已满或空时,超时会报错)
'''
Foo = Queue(2)
Foo.put('A')
Foo.put('B')
if Foo.full():
    print('队列已满')
else:
    Foo.put('C', timeout=3)

'''
队列已满
'''

'''
1. get()
    · 往队列里取值,当队列的参数为空时则会有阻塞的作用(也有timeout形参)
2. get_nowait
    · 往队列里取值且取消阻塞作用(但是当队列中的参数为空时使用会报错)
'''
Foo = Queue(2)
Foo.put('A')
Foo.put('B')
print(Foo.get())
print(Foo.get())
'''
A
B
'''

阻塞与非阻塞式进程间通信

代码示例:

from multiprocessing import Pool, Manager
from time import sleep
from random import random

list01 = ['文件', '图片', '音乐', '视频']


def foo(args):
    for i in list01:
        print('正在下载:{}'.format(i))
        sleep(random() * 2)
        args.put(i)


def bar(args):
    while True:
        if not args.empty():
            a = args.get()
            print('保存文件:{}成功!'.format(a))
            if a == list01[-1]:
                break


if __name__ == '__main__':
    # Manager:实现多进程(阻塞/非阻塞)之间数据共享
    P01 = Pool(3)
    P02 = Manager().Queue(3)
    P01.apply_async(foo, args=(P02,))
    P01.apply_async(bar, args=(P02,))
    P01.close()
    P01.join()
    '''
    正在下载:文件
    正在下载:图片
    保存文件:文件成功!
    正在下载:音乐
    保存文件:图片成功!
    正在下载:视频
    保存文件:音乐成功!
    保存文件:视频成功!
    '''

进程间通信

代码示例:

from multiprocessing import Queue, Process
from time import sleep
from random import random

list01 = ['文件', '图片', '音乐', '视频']


def foo(args):
    for i in list01:
        print('正在下载:{}'.format(i))
        sleep(random() * 2)
        args.put(i)


def bar(args):
    while True:
        if not args.empty():
            a = args.get()
            print('保存文件:{}成功!'.format(a))
            if a == list01[-1]:
                break


if __name__ == '__main__':
    P01 = Queue(3)
    P02 = Process(target=foo, args=(P01,))
    P03 = Process(target=bar, args=(P01,))
    P02.start()
    P03.start()