以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去折腾并行没啥意义(除非在做IO密集型任务)。然后自从用上了32核128GB内存,看到 htop 里面一堆空载的核,很自然地就会想这个并行必须去折腾一下。后面发现,其实 Python 的并行真的非常简单。
multiprocessing vs threading
Python 自带的库又全又好用,这是我特别喜欢 Python 的原因之一。Python 里面有 multiprocessing和 threading 这两个用来实现并行的库。用线程应该是很自然的想法,毕竟(直觉上)开销小,还有共享内存的福利,而且在其他语言里面线程用的确实是非常频繁。然而,我可以很负责任的说,如果你用的是 CPython 实现,那么用了 threading 就等同于和并行计算说再见了(实际上,甚至会比单线程更慢),除非这是个IO密集型的任务。
GIL
CPython 指的是 python.org 提供的 Python 实现。是的,Python 是一门语言,它有各种不同的实现,比如 PyPy, Jython, IronPython 等等……我们用的最多的就是 CPython,它几乎就和 Python 画上了等号。
CPython 的实现中,使用了 GIL 即全局锁,来简化解释器的实现,使得解释器每次只执行一个线程中的字节码。也就是说,除非是在等待IO操作,否则 CPython 的多线程就是彻底的谎言!
有关 GIL 下面两个资料写的挺好的:
- http://cenalulu.github.io/python/gil-in-python/
- http://www.dabeaz.com/python/UnderstandingGIL.pdf
multiprocessing.Pool
因为 GIL 的缘故 threading 不能用,那么我们就好好研究研究 multiprocessing。(当然,如果你说你不用 CPython,没有 GIL 的问题,那也是极佳的。)
首先介绍一个简单粗暴,非常实用的工具,就是 multiprocessing.Pool。如果你的任务能用 ys = map(f, xs) 来解决,大家可能都知道,这样的形式天生就是最容易并行的,那么在 Python 里面并行计算这个任务真是再简单不过了。举个例子,把每个数都平方:
import multiprocessing def f(x): return x * x cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) xs = range(5) # method 1: map print pool.map(f, xs) # prints [0, 1, 4, 9, 16] # method 2: imap for y in pool.imap(f, xs): print y # 0, 1, 4, 9, 16, respectively # method 3: imap_unordered for y in pool.imap_unordered(f, xs): print(y) # may be in any order
map 直接返回列表,而 i 开头的两个函数返回的是迭代器;imap_unordered 返回的是无序的。
当计算时间比较长的时候,我们可能想要加上一个进度条,这个时候 i 系列的好处就体现出来了。另外,有一个小技巧,就是输出 \r 可以使得光标回到行首而不换行,这样就可以制作简易的进度条了。
cnt = 0 for _ in pool.imap_unordered(f, xs): sys.stdout.write('done %d/%d\r' % (cnt, len(xs))) cnt += 1
更复杂的操作
要进行更复杂的操作,可以直接使用 multiprocessing.Process 对象。要在进程间通信可以使用:
- multiprocessing.Pipe
- multiprocessing.Queue
- 同步原语
- 共享变量
其中我强烈推荐的就是 Queue,因为其实很多场景就是生产者消费者模型,这个时候用 Queue 就解决问题了。用的方法也很简单,现在父进程创建 Queue,然后把它当做 args 或者 kwargs 传给 Process 就好了。
使用 Theano 或者 Tensorflow 等工具时的注意事项
需要注意的是,在 import theano 或者 import tensorflow 等调用了 Cuda 的工具的时候会产生一些副作用,这些副作用会原样拷贝到子进程中,然后就发生错误,如:
could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
解决的方法是,保证父进程不引入这些工具,而是在子进程创建好了以后,让子进程各自引入。
如果使用 Process,那就在 target 函数里面 import。举个例子:
import multiprocessing def hello(taskq, resultq): import tensorflow as tf config = tf.ConfigProto() config.gpu_options.allow_growth=True sess = tf.Session(config=config) while True: name = taskq.get() res = sess.run(tf.constant('hello ' + name)) resultq.put(res) if __name__ == '__main__': taskq = multiprocessing.Queue() resultq = multiprocessing.Queue() p = multiprocessing.Process(target=hello, args=(taskq, resultq)) p.start() taskq.put('world') taskq.put('abcdabcd987') taskq.close() print(resultq.get()) print(resultq.get()) p.terminate() p.join()
如果使用 Pool,那么可以编写一个函数,在这个函数里面 import,并且把这个函数作为 initializer传入到 Pool 的构造函数里面。举个例子:
import multiprocessing def init(): global tf global sess import tensorflow as tf config = tf.ConfigProto() config.gpu_options.allow_growth=True sess = tf.Session(config=config) def hello(name): return sess.run(tf.constant('hello ' + name)) if __name__ == '__main__': pool = multiprocessing.Pool(processes=2, initializer=init) xs = ['world', 'abcdabcd987', 'Lequn Chen'] print pool.map(hello, xs)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
更新动态
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]