python 封装python 多进程编程处理类需要注意哪些地方

博客访问: 4944735
博文数量: 724
博客积分: 2150
博客等级: 上尉
技术积分: 12529
注册时间:
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: Python/Ruby
众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的;
但在python 2.6中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing.
注意:这个模块的某些函数需要操作系统的支持,
例如,multiprocessing.synchronize模块在某些平台上引入时会激发一个ImportError
& 要创建一个Process是很简单的。
from multiprocessing import Process
def f(name):
& & print('hello', name)
if __name__ == '__main__':
& & &p = Process(target=f, args=('bob',))
& & &p.start()
& & &p.join()
要获得一个Process的进程ID也是很简单的。
from multiprocessing import Process
def info(title):
& & print title
& & print 'module name:', __name__
& & print 'parent process:', os.getppid()#这个测试不通过,3.0不支持
& & print 'process id:', os.getpid()
def f(name):
& & info('function f')
& & print 'hello', name
if __name__ == '__main__':
& & info('main line')
& & p = Process(target=f, args=('bob',))
& & p.start()
& & p.join()
创建进程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]])
& group: & &None,它的存在仅仅是为了与threading.Thread兼容
& target: & 一般是函数
& name: & & 进程名
& args: & & 函数的参数
& kargs: & &keywords参数
& run() & & & & & & & & &默认的run()函数调用target的函数,你也可以在子类中覆盖该函数
& start() & & & & & & & &启动该进程
& join([timeout]) & & & &父进程被停止,直到子进程被执行完毕。
& & & & & & & & & & & & &当timeout为None时没有超时,否则有超时。
& & & & & & & & & & & & &进程可以被join很多次,但不能join自己
& is_alive() & & & & & &&
& terminate() & & & & & &结束进程。
& & & & & & & & & & & & &在Unix上使用的是SIGTERM
& & & & & & & & & & & & &在Windows平台上使用TerminateProcess
& name & & & & & & & & & 进程名
& daemon & & & & & & & & 守护进程
& pid & & & & & & & & & &进程ID
& exitcode & & & & & & & 如果进程还没有结束,该值为None
& authkey & & & & & & & & & && & &&
& Queue类似于queue.Queue,一般用来进程间交互信息
from multiprocessing import Process, Queue
& & q.put([42, None, 'hello'])
&if __name__ == '__main__':
& & &q = Queue()
& & &p = Process(target=f, args=(q,))
& & &p.start()
& & &print(q.get()) & &# prints "[42, None, 'hello']"
& & &p.join()
注意:Queue是进程和线程安全的。
Queue实现了queue.Queue的大部分方法,但task_done()和join()没有实现。 & &
创建Queue:multiprocessing.Queue([maxsize])
& qsize() & & & & & & & & & & & & & & 返回Queue的大小
& empty() & & & & & & & & & & & & & & 返回一个boolean值表示Queue是否为空
& full() & & & & & & & & & & & & & & &返回一个boolean值表示Queue是否满
& put(item[, block[, timeout]]) & & &&
& put_nowait(item)
& get([block[, timeout]])
& get_nowait()
& get_no_wait()
& close() & & & & & & & & & & & & & & 表示该Queue不在加入新的元素
& join_thread() & & & & & & & & & & &&
& cancel_join_thread()
3)JoinableQueue
创建:multiprocessing.JoinableQueue([maxsize])
& task_done()
from multiprocessing import Process, Pipe
def f(conn):
& & conn.send([42, None, 'hello'])
& & conn.close()
if __name__ == '__main__':
& & parent_conn, child_conn = Pipe()
& & p = Process(target=f, args=(child_conn,))
& & p.start()
& & print(parent_conn.recv()) & # prints "[42, None, 'hello']"
& & p.join()
& multiprocessing.Pipe([duplex]) & & &返回一个Connection对象
5)异步化synchronization
from multiprocessing import Process, Lock
def f(l, i):
& & l.acquire()
& & print('hello world', i)
& & l.release()
if __name__ == '__main__':
& & lock = Lock()
& & for num in range(10):
& & & & Process(target=f, args=(lock, num)).start()
6)Shared Memory
from multiprocessing import Process, Value, Array
def f(n, a):
& & n.value = 3.1415927
& & for i in range(len(a)):
& & & & a[i] = -a[i]
if __name__ == '__main__':
& & num = Value('d', 0.0)
& & arr = Array('i', range(10))
& & p = Process(target=f, args=(num, arr))
& & p.start()
& & p.join()
& & print(num.value)
& & print(arr[:])
from multiprocessing import Process, Manager
def f(d, l):
& & d[1] = '1'
& & d['2'] = 2
& & d[0.25] = None
& & l.reverse()
if __name__ == '__main__':
& & manager = Manager()
& & d = manager.dict()
& & l = manager.list(range(10))
& & p = Process(target=f, args=(d, l))
& & p.start()
& & p.join()
& & print(d)
& & print(l)
from multiprocessing import Pool
& & return x*x
if __name__ == '__main__':
& & pool = Pool(processes=4) & & & & & & &# start 4 worker processes
& & result = pool.apply_async(f, [10]) & & # evaluate "f(10)" asynchronously
& & print result.get(timeout=1) & & & & & # prints "100" unless your computer is *very* slow
& & print pool.map(f, range(10)) & & & & &# prints "[0, 1, 4,..., 81]"
multiprocessing.Pool([processes[, initializer[, initargs]]])
& apply(func[, args[, kwds]])
& apply_async(func[, args[, kwds[, callback]]])
& map(func,iterable[, chunksize])
& map_async(func,iterable[, chunksize[, callback]])
& imap(func, iterable[, chunksize])
& imap_unordered(func, iterable[, chunksize])
& terminate()
from multiprocessing import Pool
& & return x*x
if __name__ == '__main__':
& & pool = Pool(processes=4) & & & & & & &# start 4 worker processes
& & result = pool.apply_async(f, (10,)) & # evaluate "f(10)" asynchronously
& & print(result.get(timeout=1)) & & & & &# prints "100" unless your computer is *very* slow
& & print(pool.map(f, range(10))) & & & & # prints "[0, 1, 4,..., 81]"
& & it = pool.imap(f, range(10))
& & print(next(it)) & & & & & & & & & & & # prints "0"
& & print(next(it)) & & & & & & & & & & & # prints "1"
& & print(it.next(timeout=1)) & & & & & & # prints "4" unless your computer is *very* slow
& & import time
& & result = pool.apply_async(time.sleep, (10,))
& & print(result.get(timeout=1)) & & & & &# raises TimeoutError&
multiprocessing.active_children() & & & & &返回所有活动子进程的列表
multiprocessing.cpu_count() & & & & & & & &返回CPU数目
multiprocessing.current_process() & & & & &返回当前进程对应的Process对象
multiprocessing.freeze_support()
multiprocessing.set_executable()
10)Connection对象
poll([timeout])
send_bytes(buffer[, offset[, size]])
recv_bytes([maxlength])
recv_bytes_info(buffer[, offset]) &
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
阅读(3389) | 评论(0) | 转发(0) |
相关热门文章
给主人留下些什么吧!~~
请登录后评论。9030人阅读
Python(59)
multiprocessing模块
multiprocessing包是Python中的多进程管理包。它与
threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。
看一下Process类的构造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
参数说明:
group:进程所属组。基本不用
target:表示调用对象。
args:表示调用对象的位置参数元组。
name:别名
kwargs:表示调用对象的字典。
创建进程的简单实例:
import multiprocessing
def do(n) :
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
if __name__ == '__main__' :
numList = []
for i in xrange(5) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p)
print "Process end."
执行结果:
Process-1 starting
Process end.
Process-2 starting
Process end.
Process-3 starting
Process end.
Process-4 starting
Process end.
Process-5 starting
Process end.
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,并用其start()方法启动,这样创建进程比fork()还要简单。
join()方法表示等待子进程结束以后再继续往下运行,通常用于进程间的同步。
在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__’ :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。
在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
下面介绍一下multiprocessing 模块下的Pool类下的几个方法
函数原型:
apply(func[, args=()[, kwds={}]])
该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。
apply_async()
函数原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
与apply用法一样,但它是非阻塞且支持结果返回进行回调。
函数原型:
map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
关闭进程池(pool),使其不在接受新的任务。
terminate()
结束工作进程,不在处理未处理的任务。
主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。
multiprocessing.Pool类的实例:
import time
from multiprocessing import Pool
def run(fn):
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print 'shunxu:'
s = time.time()
for fn in testFL:
e1 = time.time()
print "顺序执行时间:", int(e1 - s)
print 'concurrent:'
pool = Pool(5)
rl =pool.map(run, testFL)
pool.close()
pool.join()
e2 = time.time()
print "并行执行时间:", int(e2-e1)
执行结果:
顺序执行时间: 6
concurrent:
并行执行时间: 2
[1, 4, 9, 16, 25, 36]
上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。
程序中的r1表示全部进程执行结束后全局的返回结果集,run函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了。
再看一个实例:
import time
from multiprocessing import Pool
def run(fn) :
time.sleep(2)
if __name__ == "__main__" :
startTime = time.time()
testFL = [1,2,3,4,5]
pool = Pool(10)
pool.map(run,testFL)
pool.close()
pool.join()
endTime = time.time()
print "time :", endTime - startTime
执行结果:
再次执行结果如下:
结果中为什么还有空行和没有折行的数据呢?其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。
进程实战实例
并行处理某个目录下文件中的字符个数和行数,存入res.txt文件中,
每个文件一行,格式为:filename:lineNumber,charNumber
import time
from multiprocessing import Pool
def getFile(path) :
fileList = []
for root, dirs, files in list(os.walk(path)) :
for i in files :
if i.endswith('.txt') or i.endswith('.10w') :
fileList.append(root + "\\" + i)
return fileList
def operFile(filePath) :
filePath = filePath
fp = open(filePath)
content = fp.readlines()
fp.close()
lines = len(content)
alphaNum = 0
for i in content :
alphaNum += len(i.strip('\n'))
return lines,alphaNum,filePath
def out(list1, writeFilePath) :
fileLines = 0
charNum = 0
fp = open(writeFilePath,'a')
for i in list1 :
fp.write(i[2] + " 行数:"+ str(i[0]) + " 字符数:"+str(i[1]) + "\n")
fileLines += i[0]
charNum += i[1]
fp.close()
print fileLines, charNum
if __name__ == "__main__":
startTime = time.time()
filePath = "C:\\wcx\\a"
fileList = getFile(filePath)
pool = Pool(5)
resultList =pool.map(operFile, fileList)
pool.close()
pool.join()
writeFilePath = "c:\\wcx\\res.txt"
print resultList
out(resultList, writeFilePath)
endTime = time.time()
print "used time is ", endTime - startTime
执行结果:
耗时不到1秒,可见多进程并发执行速度是很快的。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:89139次
积分:1877
积分:1877
排名:第18428名
原创:103篇
(4)(12)(25)(27)(30)(7)(3)python多进程编程 深入理解python多进程编程-Python教程
当前位置:&>&&>& &
深入理解python多进程编程
python多进程编程 深入理解python多进程编程
| 来源:网络 | 关键字:
1、python多进程编程背景python中的多进程最大的好处就是充分利用多核cpu的资源,不像python中的多线程,受制于GIL的限制,从而只能进行cpu分配,在python的多进程中,适合于所有
1、python多进程编程背景python中的多进程最大的好处就是充分利用多核cpu的资源,不像python中的多线程,受制于GIL的限制,从而只能进行cpu分配,在python的多进程中,适合于所有的场合,基本上能用多线程的,那么基本上就能用多进程。在进行多进程编程的时候,其实和多线程差不多,在多线程的包threading中,存在一个线程类Thread,在其中有三种方法来创建一个线程,启动线程,其实在多进程编程中,存在一个进程类Process,也可以使用那集中方法来使用;在多线程中,内存中的数据是可以直接共享的,例如list等,但是在多进程中,内存数据是不能共享的,从而需要用单独的数据结构来处理共享的数据;在多线程中,数据共享,要保证数据的正确性,从而必须要有所,但是在多进程中,锁的考虑应该很少,因为进程是不共享内存信息的,进程之间的交互数据必须要通过特殊的数据结构,在多进程中,主要的内容如下图:2、多进程的类Process多进程的类Process和多线程的类Thread差不多的方法,两者的接口基本相同,具体看以下的代码:#!/usr/bin/env pythonfrom multiprocessing import Processimport osimport timedef func(name):
print 'start a process'
time.sleep(3)
print 'the process parent id :',os.getppid()
print 'the process id is :',os.getpid()if __name__ =='__main__':
processes = []
for i in range(2):
p = Process(target=func,args=(i,))
processes.append(p)
for i in processes:
print 'start all process'
for i in processes:
print 'all sub process is done!'在上面例子中可以看到,多进程和多线程的API接口是一样一样的,显示创建进程,然后进行start开始运行,然后join等待进程结束。在需要执行的函数中,打印出了进程的id和pid,从而可以看到父进程和子进程的id号,在linu中,进程主要是使用fork出来的,在创建进程的时候可以查询到父进程和子进程的id号,而在多线程中是无法找到线程的id,执行效果如下:start all processstart a processstart a processthe process parent id : 8036the process parent id : 8036the process id is : 8037the process id is : 8038all sub process is done!在操作系统中查询的id的时候,最好用pstree,清晰:├─sshd(1508)─┬─sshd(2259)───bash(2261)───python(7520)─┬─python(7521)
├─python(7522)
├─python(7523)
├─python(7524)
├─python(7525)
├─python(7526)
├─python(7527)
├─python(7528)
├─python(7529)
├─python(7530)
├─python(7531)
└─python(7532)在进行运行的时候,可以看到,如果没有join语句,那么主进程是不会等待子进程结束的,是一直会执行下去,然后再等待子进程的执行。在多进程的时候,说,我怎么得到多进程的返回值呢?然后写了下面的代码:#!/usr/bin/env pythonimport multiprocessingclass [.Cn]MyProcess(multiprocessing.Process):
def __init__(self,name,func,args):
super(MyProcess,self).__init__()
self.name = name
self.func = func
self.args = args
self.res = ''
def run(self):
self.res = self.func(*self.args)
print self.name
print self.res
return (self.res,'kel')def func(name):
print 'start process...'
return name.upper()if __name__ == '__main__':
processes = []
result = []
for i in range(3):
p = MyProcess('process',func,('kel',))
processes.append(p)
for i in processes:
for i in processes:
for i in processes:
result.append(i.res)
for i in result:
print i尝试从结果中返回值,从而在主进程中得到子进程的返回值,然而,,,并没有结果,后来一想,在进程中,进程之间是不共享内存的 ,那么使用list来存放数据显然是不可行的,进程之间的交互必须依赖于特殊的数据结构,从而以上的代码仅仅是执行进程,不能得到进程的返回值,但是以上代码修改为线程,那么是可以得到返回值的。3、进程间的交互Queue进程间交互的时候,首先就可以使用在多线程里面一样的Queue结构,但是在多进程中,必须使用multiprocessing里的Queue,代码如下:#!/usr/bin/env pythonimport multiprocessingclass MyProcess(multiprocessing.Process):
def __init__(self,name,func,args):
super(MyProcess,self).__init__()
self.name = name
self.func = func
self.args = args
self.res = ''
def run(self):
self.res = self.func(*self.args)def func(name,q):
print 'start process...'
q.put(name.upper())if __name__ == '__main__':
processes = []
q = multiprocessing.Queue()
for i in range(3):
p = MyProcess('process',func,('kel',q))
processes.append(p)
for i in processes:
for i in processes:
while q.qsize() & 0:
print q.get()其实这个是上面例子的改进,在其中,并没有使用什么其他的代码,主要就是使用Queue来保存数据,从而可以达到进程间交换数据的目的。在进行使用Queue的时候,其实用的是socket,感觉,因为在其中使用的还是发送send,然后是接收recv。在进行数据交互的时候,其实是父进程和所有的子进程进行数据交互,所有的子进程之间基本是没有交互的,除非,但是,也是可以的,例如,每个进程去Queue中取数据,但是这个时候应该是要考虑锁,不然可能会造成数据混乱。4、 进程之间交互Pipe在进程之间交互数据的时候还可以使用Pipe,代码如下:#!/usr/bin/env pythonimport multiprocessingclass MyProcess(multiprocessing.Process):
def __init__(self,name,func,args):
super(MyProcess,self).__init__()
self.name = name
self.func = func
self.args = args
self.res = ''
def run(self):
self.res = self.func(*self.args)def func(name,q):
print 'start process...'
child_conn.send(name.upper())if __name__ == '__main__':
processes = []
parent_conn,child_conn = multiprocessing.Pipe()
for i in range(3):
p = MyProcess('process',func,('kel',child_conn))
processes.append(p)
for i in processes:
for i in processes:
for i in processes:
print parent_conn.recv()在以上代码中,主要是使用Pipe中返回的两个socket来进行传输和接收数据,在父进程中,使用的是parent_conn,在子进程中使用的是child_conn,从而子进程发送数据的方法send,而在父进程中进行接收方法recv最好的地方在于,明确的知道收发的次数,但是如果某个出现异常,那么估计pipe不能使用了。5、进程池pool其实在使用多进程的时候,感觉使用pool是最方便的,在多线程中是不存在pool的。在使用pool的时候,可以限制每次的进程数,也就是剩余的进程是在排队,而只有在设定的数量的进程在运行,在默认的情况下,进程是cpu的个数,也就是根据multiprocessing.cpu_count()得出的结果。在poo中,有两个方法,一个是map一个是imap,其实这两方法超级方便,在执行结束之后,可以得到每个进程的返回结果,但是缺点就是每次的时候,只能有一个参数,也就是在执行的函数中,最多是只有一个参数的,否则,需要使用组合参数的方法,代码如下所示:#!/usr/bin/env pythonimport multiprocessingdef func(name):
print 'start process'
return name.upper()if __name__ == '__main__':
p = multiprocessing.Pool(5)
print p.map(func,['kel','smile'])
for i in p.imap(func,['kel','smile']):
print i在使用map的时候,直接返回的一个是一个list,从而这个list也就是函数执行的结果,而在imap中,返回的是一个由结果组成的迭代器,如果需要使用多个参数的话,那么估计需要*args,从而使用参数args。在使用apply.async的时候,可以直接使用多个参数,如下所示:#!/usr/bin/env pythonimport multiprocessingimport timedef func(name):
print 'start process'
time.sleep(2)
return name.upper()if __name__ == '__main__':
results = []
p = multiprocessing.Pool(5)
for i in range(7):
res = p.apply_async(func,args=('kel',))
results.append(res)
for i in results:
print i.get(2.1)在进行得到各个结果的时候,注意使用了一个list来进行append,要不然在得到结果get的时候会阻塞进程,从而将多进程编程了单进程,从而使用了一个list来存放相关的结果,在进行得到get数据的时候,可以设置超时时间,也就是get(timeout=5),这种设置。总结:在进行多进程编程的时候,注意进程之间的交互,在执行函数之后,如何得到执行函数的结果,可以使用特殊的数据结构,例如Queue或者Pipe或者其他,在使用pool的时候,可以直接得到结果,map和imap都是直接得到一个list和可迭代对象,而apply_async得到的结果需要用一个list装起来,然后得到每个结果。以上这篇深入理解python多进程编程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。
网友评论仅供其表达个人看法,并不表明网易立场。

我要回帖

更多关于 python 多进程 返回值 的文章

 

随机推荐