Python
程序实体
进程
- Windows
- multiprocessing
- Linux
- fork
- 应用场景
- 运算率大的时候(不共享数据,计算密集型)
进程方法与全局变量的访问
代码示例:
from multiprocessing import Process # 引入标准库函数
from time import sleep
def foo(args, kwargs):
while True:
sleep(args) # 模拟阻塞状态
print(kwargs)
def bar(args, kwargs):
while True:
sleep(args)
print(kwargs)
if __name__ == '__main__': # 因为存在模块间互相调用
'''
1. Process
· 创建子进程对象
1. target
· 传入可调用的参数(任务)
2. name
· 子进程名字
3. args
· 给可调用的参数传入的可迭代的值
2. start()
· 启动子进程并执行任务,而run()只是执行了任务但没有启动子进程
3. terminate()
· 终止子进程
'''
P01 = Process(target=foo, name='任务01', args=(1, 'foo'))
P02 = Process(target=bar, name='任务02', args=(2, 'bar'))
P01.start()
print(P01.name)
P02.start()
print(P02.name)
'''
任务01
任务02
foo
foo
bar
foo
'''
进程全局变量的访问特点
在每个子进程里面都复制一份全局变量,保证每个子进程间互不干扰(全局变量不会变,跟可变与不可变类型的数据无关)
代码示例:
from multiprocessing import Process
from time import sleep
h = 1
def foo(args):
global h
while h < 10:
sleep(args)
h += 1
print('foo ==>',h)
def bar(args):
global h
while h < 10:
sleep(args)
h += 1
print('bar ==>',h)
if __name__ == '__main__':
# join():等这个子进程结束再执行主进程的任务(阻塞)
P01 = Process(target=foo, name='任务01', args=(1,))
P02 = Process(target=bar, name='任务02', args=(2,))
P01.start()
P02.start()
P02.join()
print('global ==>', h)
'''
foo ==> 2
foo ==> 3
bar ==> 2
foo ==> 4
foo ==> 5
bar ==> 3
global ==> 1
'''
自定义进程
代码示例:
from multiprocessing import Process
from time import sleep
class Foo(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
a = 1
while True:
print('{} ==> {}\n'.format(a, self.name))
a += 1
sleep(1)
if __name__ == '__main__':
# 启动子进程时会从父类Process调用run函数
P01 = Foo('foo')
P02 = Foo('bar')
P01.start()
P02.start()
'''
1 ==> foo
1 ==> bar
2 ==> foo
2 ==> bar
3 ==> foo
3 ==> bar
'''
非阻塞式进程池
代码示例:
import os
from time import time, sleep
from multiprocessing import Pool # 进程池类
from random import random
def foo(args):
print('执行任务:{} 进程ID:{}'.format(args, os.getpid())) # getpid():获取进程ID
start = time()
sleep(random() * 2)
end = time()
return '完成任务:{} 用时:{} 进程ID:{}'.format(args, end - start, os.getpid()) # 返给回调函数
def bar(args): # 接收return的数据
myList.append(args) # 回调函数本身不能return
if __name__ == '__main__':
'''
1. apply_async
· 非阻塞函数,callback(回调函数)
2. close()
· 关闭进程池(不再接收新的任务)
'''
myList = []
P01 = Pool(3) # 创建拥有三个进程的进程池
list01 = ['任务01', '任务02', '任务03', '任务04', '任务05']
for i in list01:
P01.apply_async(foo, args=(i,), callback=bar)
P01.close()
P01.join()
print('结束任务')
for i in myList:
print(i)
'''
执行任务:任务01 进程ID:2864
执行任务:任务02 进程ID:9220
执行任务:任务03 进程ID:4972
执行任务:任务04 进程ID:2864
执行任务:任务05 进程ID:4972
结束任务
完成任务:任务01 用时:0.8482961654663086 进程ID:2864
完成任务:任务03 用时:0.929196834564209 进程ID:4972
完成任务:任务02 用时:1.0513510704040527 进程ID:9220
完成任务:任务04 用时:0.3508336544036865 进程ID:2864
完成任务:任务05 用时:0.909203052520752 进程ID:4972
'''
进程池之阻塞式
代码示例:
import os
from time import time, sleep
from multiprocessing import Pool
from random import random
def foo(args):
print('执行任务:{} 进程ID:{}'.format(args, os.getpid()))
start = time()
sleep(random() * 2)
end = time()
print('完成任务:{} 用时:{} 进程ID:{}'.format(args, end - start, os.getpid()))
if __name__ == '__main__':
# apply:阻塞式函数(无回调函数)
list01 = ['任务01', '任务02', '任务03', '任务04', '任务05']
P01 = Pool(3)
for i in list01:
P01.apply(foo, args=(i,))
P01.close()
P01.join()
'''
执行任务:任务01 进程ID:3436
完成任务:任务01 用时:1.5749576091766357 进程ID:3436
执行任务:任务02 进程ID:10308
完成任务:任务02 用时:0.6210627555847168 进程ID:10308
执行任务:任务03 进程ID:4824
完成任务:任务03 用时:0.8212382793426514 进程ID:4824
执行任务:任务04 进程ID:3436
完成任务:任务04 用时:0.5435049533843994 进程ID:3436
执行任务:任务05 进程ID:10308
完成任务:任务05 用时:1.8371591567993164 进程ID:10308
'''
队列
代码示例:
from multiprocessing import Queue # 导入队列的类
'''
1. put
· 添加参数进队列(put函数有阻塞作用,如果队列满了则只能等待空闲出位置才可以往里面添加参数,否则就会一直是阻塞状态,主进程也会阻塞)
2. qsize
· 获取当前队列中有多少个参数
3. put_nowait
· 添加参数进队列且取消阻塞作用(但是当队列中的参数已满时使用会报错)
'''
Foo = Queue(3) # 创建容纳3个参数的队列
Foo.put('foo')
Foo.put('bar')
print(Foo.qsize())
'''
2
'''
'''
1. full()
· 判断队列中的参数是否已满
2. not_full()
· 判断队列中的参数是否未满
3. empty()
· 判断队列的参数是否为空
4. timeout
· 超时的时间(单位:秒,当队列的参数已满或空时,超时会报错)
'''
Foo = Queue(2)
Foo.put('A')
Foo.put('B')
if Foo.full():
print('队列已满')
else:
Foo.put('C', timeout=3)
'''
队列已满
'''
'''
1. get()
· 往队列里取值,当队列的参数为空时则会有阻塞的作用(也有timeout形参)
2. get_nowait
· 往队列里取值且取消阻塞作用(但是当队列中的参数为空时使用会报错)
'''
Foo = Queue(2)
Foo.put('A')
Foo.put('B')
print(Foo.get())
print(Foo.get())
'''
A
B
'''
阻塞与非阻塞式进程间通信
代码示例:
from multiprocessing import Pool, Manager
from time import sleep
from random import random
list01 = ['文件', '图片', '音乐', '视频']
def foo(args):
for i in list01:
print('正在下载:{}'.format(i))
sleep(random() * 2)
args.put(i)
def bar(args):
while True:
if not args.empty():
a = args.get()
print('保存文件:{}成功!'.format(a))
if a == list01[-1]:
break
if __name__ == '__main__':
# Manager:实现多进程(阻塞/非阻塞)之间数据共享
P01 = Pool(3)
P02 = Manager().Queue(3)
P01.apply_async(foo, args=(P02,))
P01.apply_async(bar, args=(P02,))
P01.close()
P01.join()
'''
正在下载:文件
正在下载:图片
保存文件:文件成功!
正在下载:音乐
保存文件:图片成功!
正在下载:视频
保存文件:音乐成功!
保存文件:视频成功!
'''
进程间通信
代码示例:
from multiprocessing import Queue, Process
from time import sleep
from random import random
list01 = ['文件', '图片', '音乐', '视频']
def foo(args):
for i in list01:
print('正在下载:{}'.format(i))
sleep(random() * 2)
args.put(i)
def bar(args):
while True:
if not args.empty():
a = args.get()
print('保存文件:{}成功!'.format(a))
if a == list01[-1]:
break
if __name__ == '__main__':
P01 = Queue(3)
P02 = Process(target=foo, args=(P01,))
P03 = Process(target=bar, args=(P01,))
P02.start()
P03.start()