python 协程 gevent的进程池Pool和协程gevent冲突吗

突破python缺陷,实现几种自定义线程池
以及进程、线程、协程的介绍
来源:博客园
Python线程
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def show(arg):
time.sleep(1)
print 'thread'+str(arg)

for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start()

print 'main thread stop'


  
上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
更多方法:
start
线程准备就绪,等待CPU调度
setName
为线程设置名称
getName
获取线程名称
setDaemon
设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
join
逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run
线程被cpu调度后自动执行线程对象的run方法








线程锁
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。所以,可能出现如下问题:
未使用锁

#!/usr/bin/env python
#coding:utf-8

import threading
import time

gl_num = 0

lock = threading.RLock()

def Func():
lock.acquire()
global gl_num
gl_num +=1
time.sleep(1)
print gl_num
lock.release()

for i in range(10):
t = threading.Thread(target=Func)
t.start()


  
event
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import threading
 
 
def do(event):
print 'start'
event.wait()
print 'execute'
 
 
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
 
event_obj.clear()
inp = raw_input('input:')
if inp == 'true':
event_obj.set()

Python 进程

from multiprocessing import Process
import threading
import time

def foo(i):
print 'say hi',i

for i in range(10):
p = Process(target=foo,args=(i,))
p.start()


  
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。
进程数据共享
进程各自持有一份数据,默认无法共享数据
进程间默认无法数据共享

#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
 
def Foo(i):
temp[i] = 100+i
for item in temp:
print i,'-----&',item
 
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
 
#方法二:manage.dict()共享数据
from multiprocessing import Process,Manager
 
manage = Manager()
dic = manage.dict()
 
def Foo(i):
dic[i] = 100+i
print dic.values()
 
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()

类型对应表
当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。
进程锁实例
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
apply
apply_async

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
multiprocessing import Process,Pool
import time

def Foo(i):
time.sleep(2)
return i+100

def Bar(arg):
print arg

pool = Pool(5)
#print pool.apply(Foo,(1,))
#print pool.apply_async(func =Foo, args=(1,)).get()

for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)

print 'end'
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
greenlet

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
 
from greenlet import greenlet
 
 
def test1():
print 12
gr2.switch()
print 34
gr2.switch()
 
 
def test2():
print 56
gr1.switch()
print 78
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

import gevent
 
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
 
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
 
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])


遇到IO操作自动切换:

from
monkey.patch_all()
import gevent
import urllib2

def f(url):
print('GET: %s' % url)
resp = urllib2.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, '/'),
gevent.spawn(f, '/'),
])

线程池:
 
方案简介:
 
方案一:简单版本的线程池,每次都要创建线程池;
方案二:支持传函数、传参、传回调函数、立即终止所有线程、最大优点:线程的循环利用,节省时间和资源
★★★★★
方案三:现有模块,直接调用即可,不支持回调函数
方案一:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
 
 
class ThreadPool(object):
 
def __init__(self, max_num=20):
self.queue = Queue.Queue(max_num)
for i in xrange(max_num):
self.queue.put(threading.Thread)
 
def get_thread(self):
return self.queue.get()
 
def add_thread(self):
self.queue.put(threading.Thread)
 
"""
pool = ThreadPool(10)
 
def func(arg, p):
print arg
import time
time.sleep(2)
p.add_thread()
 
 
for i in xrange(30):
thread = pool.get_thread()
t = thread(target=func, args=(i, pool))
t.start()
"""


 方案二:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

def __init__(self, max_num):
self.q = queue.Queue()
self.max_num = max_num

self.terminal = False
self.generate_list = []
self.free_list = []

def run(self, func, args, callback=None):
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""

if len(self.free_list) == 0 and len(self.generate_list) & self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)

def generate_thread(self):
创建一个线程
t = threading.Thread(target=self.call)
t.start()

def call(self):
循环去获取任务函数并执行任务函数
current_thread = threading.currentThread
self.generate_list.append(current_thread)

event = self.q.get()
while event != StopEvent:

func, arguments, callback = event
result = func(*arguments)
status = True
except Exception as e:
status = False
result = e

if callback is not None:
callback(status, result)
except Exception as e:
pass

if self.terminal: # False
event = StopEvent
else:
with self.worker_state(self.free_list,current_thread):
event = self.q.get()


else:
self.generate_list.remove(current_thread)

@contextlib.contextmanager
def worker_state(self,x,v):
x.append(v)
yield
finally:
x.remove(v)

def close(self):
num = len(self.generate_list)
while num:
self.q.put(StopEvent)
num -= 1

# 终止线程(清空队列)
def terminate(self):

self.terminal = True

while self.generate_list:
self.q.put(StopEvent)
self.q.empty()



import time

def work(i):
time.sleep(1)
print(i)

pool = ThreadPool(10)
for item in range(50):
pool.run(func=work, args=(item,))

# pool.terminate() #立即终止所有线程


方案三、

from concurrent.futures import ThreadPoolExecutor
import time

def f1(a):
time.sleep(2)
print(a)
return 1

pool=ThreadPoolExecutor(5)
for i in range(30):
a=pool.submit(f1,i)
# x=a.result()#获取返回值,如果有,会阻塞
免责声明:本站部分内容、图片、文字、视频等来自于互联网,仅供大家学习与交流。相关内容如涉嫌侵犯您的知识产权或其他合法权益,请向本站发送有效通知,我们会及时处理。反馈邮箱&&&&。
学生服务号
在线咨询,奖学金返现,名师点评,等你来互动下次自动登录
现在的位置:
& 综合 & 正文
python Gevent – 高性能的Python并发框架
话说gevent也没个logo啥的,于是就摆了这张图= =|||,首先这是一种叫做greenlet的鸟,而在python里,按照官方解释greenlet是轻量级的并行编程,而gevent呢,就是利用greenlet实现的基于协程的python的网络library,好了,关系理清了。。。
话说pycon没有白去阿,了解了很多以前不知道的东西,比如说协程,gevent,greenlet,eventlet。说说协程,进程和线程大家平时了解的都比较多,而协程算是一种轻量级进程,但又不能叫进程,因为操作系统并不知道它的存在。什么意思呢,就是说,协程像是一种在级别来模拟系统级别的进程,由于是单进程,并且少了上下文切换,于是相对来说系统消耗很少,而且网上的各种测试也表明,协程确实拥有惊人的速度。并且在实现过程中,协程可以用以前同步思路的写法,而运行起来确是异步的,也确实很有意思。话说有一种说法就是说进化历程是多进程-&多线程-&异步-&协程,暂且不论说的对不对,单从诸多赞誉来看,协程还是有必要理解一下的。
比较惭愧,greenlet没怎么看就直接看gevent,还是可以看看的,尤其是源码里的examples都相当不错,有助于理解gevent的使用。
gevent封装了很多很方便的接口,其中一个就是monkey
from gevent import monkey
monkey.patch_all()
这样两行,就可以使用python以前的socket之类的,因为gevent已经给你自动转化了,真是超级方便阿。
而且安装gevent也是很方便,首先安装依赖libevent和greenlet,再利用pypi安装即可
sudo apt-get install libevent-dev
sudo apt-get install python-dev
sudo easy-install gevent
然后,gevent中的event,有wait,set等api,方便你可以让某些协程在某些地方等待条件,然后用另一个去唤醒他们。
再就是gevent实现了wsgi可以很方便的当作python的web server服务器使。
最后放送一个我利用gevent实现的一个带有缓存的dns server
# -*- coding: UTF-8 -*-
import gevent
import dnslib
from gevent import socket
from gevent import event
rev=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
rev.bind(('',53))
def preload():
for i in open('ip'):
ip.append(i)
print "load "+str(len(ip))+"
def send_request(data):
global cur
ret=rev.sendto(data,(ip[cur],53))
cur=(cur+1)%len(ip)
class Cache:
def __init__(self):
def get(self,key):
return self.c.get(key,None)
def set(self,key,value):
self.c[key]=value
def remove(self,key):
self.c.pop(key,None)
cache=Cache()
def handle_request(s,data,addr):
req=dnslib.DNSRecord.parse(data)
qname=str(req.q.qname)
qid=req.header.id
ret=cache.get(qname)
ret=dnslib.DNSRecord.parse(ret)
ret.header.id=qid;
s.sendto(ret.pack(),addr)
e=event.Event()
cache.set(qname+"e",e)
send_request(data)
e.wait(60)
tmp=cache.get(qname)
tmp=dnslib.DNSRecord.parse(tmp)
tmp.header.id=qid;
s.sendto(tmp.pack(),addr)
def handle_response(data):
req=dnslib.DNSRecord.parse(data)
qname=str(req.q.qname)
print qname
cache.set(qname,data)
e=cache.get(qname+"e")
cache.remove(qname+"e")
def handler(s,data,addr):
req=dnslib.DNSRecord.parse(data)
if req.header.qr:
handle_response(data)
else:handle_request(s,data,addr)
def main():
while True:
data,addr=rev.recvfrom(8192)
gevent.spawn(handler,rev,data,addr)
if __name__ == '__main__':
这个是直接利用了dict来作为缓存查询了,在还有我将dict换成redis持久化实现的另一个版本(话说redis的python api也可以利用pypi安装,pypi()这真是个好东西阿),话说可以将这段放到国外的服务器上去运行,然后修改dns的地址去指向它,然后你懂的。。。
##################################
gevent相关,请去官网下载gevent模块
程序及注释如下:
# -*- coding: cp936 -*-
import gevent
import time
from gevent import event #调用gevent的event子模块
#三个进程需要定义三个事件event1,event2,event3,来进行12,23,31循环机制,即进程一,进程二,进程三顺序执行
def fun1(num,event1,event2):#固定格式
while i&10: #设置循环10次
time.sleep(1) #睡眠1秒
print'进程一:'
event2.set() #将event2值设为True
event1.clear()#将event1值设为False
event1.wait()#event1等待,其值为True时才执行
def fun2(num,event2,event3):
while i&10:
time.sleep(1)
print'进程二:'
event3.set()#将event3值设为True
event2.clear()#将event2值设为False
event2.wait()#event2等待,其值为True时才执行
def fun3(num,event3,event1):
while i&10:
time.sleep(1)
print'进程三:'
event1.set()
event3.clear()
event3.wait()
if __name__=="__main__": #执行调用格式
act1=gevent.event.Event() #调用event中的Event类,用act1表示
act2=gevent.event.Event()
act3=gevent.event.Event()
#三个进程,act1,act2,act3
Gevents=[] #建立一个数列,用来存和管理进程
g=gevent.Greenlet(fun1,1,act1,act2) #调用gevent中的Greenlet子模块,用Greenlet创建进程一
print'进程一启动:'
Gevents.append(g) #将进程一加入到Gevents数列
g=gevent.Greenlet(fun2,2,act2,act3)
print'进程二启动:'
Gevents.append(g)
g=gevent.Greenlet(fun3,3,act3,act1)
print'进程三启动:'
print'所有进程都已启动!'
Gevents.append(g)
gevent.joinall(Gevents) #调用Greenlet中的joinall函数,将Gevents的进程收集排列
##################################
您可以创建几个 Greenlet 对象为几个任务。
每个 greenlet 是。
from gevent import monkey
monkey.patch_all()
import gevent
from gevent import Greenlet
class Task(Greenlet):
def __init__(self, name):
Greenlet.__init__(self)
self.name = name
def _run(self):
print "Task %s: some task..." % self.name
t1 = Task("task1")
t2 = Task("task2")
t1.start()
t2.start()
# here we are waiting all tasks
gevent.joinall([t1,t2])
##################################
首先,gevent是一个网络库:libevent是一个事件分发引擎,greenlet提供了轻量级线程的支持。所以它不适合处理有长时间阻塞IO的情况。
gevent就是基于这两个东西的一个专门处理网络逻辑的并行库。
1. gevent.spawn启动的所有协程,都是运行在同一个线程之中,所以协程不能跨线程同步数据。
2. gevent.queue.Queue 是协程安全的。
3. gevent启动的并发协程,具体到task function,不能有长时间阻塞的IO操作。因为gevent的协程的特点是,当前协程阻塞了才会切换到别的协程。
如果当前协程长时间阻塞,则不能显示(gevent.sleep(0),或隐式,由gevent来做)切换到别的协程。导致程序出问题。
4. 如果有长时间阻塞的IO操作,还是用传统的线程模型比较好。
5. 因为gevent的特点总结是:事件驱动+协程+非阻塞IO,事件驱动值得是libvent对epool的封装,来基于事件的方式处理IO。
协程指的是greenlet,非阻塞IO指的是gevent已经patch过的各种库,例如socket和select等等。
6. 使用gevent的协程,最好要用gevent自身的非阻塞的库。如httplib, socket, select等等。
7. gevent适合处理大量无阻塞的任务,如果有实在不能把阻塞的部分变为非阻塞再交给gevent处理,就把阻塞的部分改为异步吧。
##################################
1. gevent.server.StreamServer 会针对每个客户端连接启动一个greenlet处理,要注意的是,如果不循环监听( 阻塞在read ),
每个greenlet会在完成后立即退出,从而导致客户端退出( 发送FIN_ACK给客户端 )。这个问题折腾了一晚上,终于弄明白了。坑爹啊。。。
2. 要非常仔细的检查,greenlet处理的代码,发现有可能阻塞IO的地方,尽量用gevent提供的库。
3. 一些第三方库隐藏了自己的实现( 通常是直接封装C库),要使得gevent兼容它们,可以用monkey_patch,但不保证全部管用。
4. 最后最后的一点,gevent的greenlet性能非常高,所以如果是用它作为并发的client端,那么一定要注意,你的server端处理速度一定要足够快!
否则你的客户端代码会因为服务端的慢速,而失去了greenlet的优势。。。
####################################
安装 libevent:apt-get install libevent-dev
安装python-dev:apt-get install python-dev
安装greenlet:easy_install greenlet
安装gevent:easy_install gevent
一个小测试,测试gevent 的任务池
from gevent import pool
g = pool.Pool()
for i in xrange(100):
g.spawn(b)
g.spawn(a)
以上内容转自互联网:
&&&&推荐文章:
【上篇】【下篇】2767人阅读
python(3)
学习杂记(9)
前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。
不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。
举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?
例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):
1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的
2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)
3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。
还有更好的方法吗?答案是肯定的,它就是:
4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)
协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
python里面怎么使用协程?答案是使用gevent,使用方法:
使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)
多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
monkey.patch_all()
import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
s = requests.Session()
r = s.get(url,timeout=1)#在这里抓取页面
except Exception,e:
def process_start(url_list):
tasks = []
for url in url_list:
tasks.append(gevent.spawn(fetch,url))
gevent.joinall(tasks)#使用协程来执行
def task_start(filepath,flag = 100000):#每10W条url启动一个进程
with open(filepath,'r') as reader:#从给定的文件中读取url
url = reader.readline().strip()
url_list = []#这个list用于存放协程任务
i = 0 #计数器,记录添加了多少个url到协程队列
while url!='':
url_list.append(url)#每次读取出url,将url添加到队列
if i == flag:#一定数量的url就启动一个进程并执行
p = Process(target=process_start,args=(url_list,))
url_list = [] #重置url队列
i = 0 #重置计数器
url = reader.readline().strip()
if url_list not []:#若退出循环后任务队列里还有url剩余
p = Process(target=process_start,args=(url_list,))#把剩余的url全都放到最后这个进程来执行
if __name__ == '__main__':
task_start('./testData.txt')#读取指定文件
细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:59427次
排名:千里之外
原创:20篇
评论:14条
(1)(2)(1)(1)(2)(1)(2)(2)(1)(1)(1)(2)(3)(4)(2)(1)

我要回帖

更多关于 gevent 协程数量 的文章

 

随机推荐