gevent gevent.queue gevent读取文件

gevent 一种异步的方式,基于事件循环..   跟 asyncio 里的东西运作的差不多

官方手册说的太不清楚 . 

自己写了个入门教程.

一个最简单的例子:

spawn 将把你的函数封装成一个个协程对象

# 注意. gevent.sleep 不是 time.sleep . 下一个例子说明
def fuck1(arg):
    print('我在这: ',fuck1.__code__.co_firstlineno)
    gevent.sleep(1)                                       
    return arg


g1 = gevent.spawn(fuck1, 123)  #产生一个GreenLet 协程 .
print(g1 , type(g1))            #看看是杀
g1.join()       #等待咯



# 与上面一种完全一样的方式 .  gevent.spawn 相当于创建一个GreenLet ,然后start()
g2 = gevent.Greenlet(fuck1,456)
g2.start()                    #启动协成
print(g2, type(g2))
g2.join()

下面的例子中. 我不再使用gevent.Greenlet 来自己创建了,比较麻烦.直接spawn了 。

下面的例子里,我也不再使用继承的Greenlet啦.
如果对于GreenLet需要,也可以自己继承Greenlet . 重写 _run (有个下划线) 函数 即可:

class fuckme( gevent.Greenlet):
    def __init__(self ,*args):
        gevent.Greenlet.__init__(self)

        #自己可以弄点属性啥的。我就不弄了

    def _run(self): #主要是这个  run前面有个线
        i = 0
        while i <3 :
            print(' 我是弱智')
            i+=1
            gevent.sleep(0.3)

g = fuckme()
g.start()
g.join()

用gevent.joinall 等待多个协成对象:

def fuck1(arg):
    print('我在这: ',fuck1.__code__.co_firstlineno)
    gevent.sleep(1)                                       #你会发现2个函数几乎同时睡眠. 不再像time.sleep
    return arg



# 将返回一个 list . 里面存放一个个GreenLet , 使用 value 获取返回值
res  = gevent.joinall([
                    gevent.spawn(fuck1 , 123) ,  #产生一个GreenLet 协成
                    gevent.spawn(fuck1 , 456) ,
                ])

print(res ,  type(res))

for v in res:
    print(v.value)
    
#修改一下,更明显

def fuck1(arg):
    print('参数 < %s > 我在这: '%arg,fuck1.__code__.co_firstlineno)
    gevent.sleep(1)
    print('参数 < %s > 我醒来了 我在这: '%arg, fuck1.__code__.co_firstlineno)
    return arg


cor_list = [gevent.spawn(fuck1  ,  arg )  for arg in range(5)]
res  = gevent.joinall(cor_list)

再来一些例子:

交互的运行着.

def fuck1(arg):
    print('参数 < %s > 我在这: '%arg,fuck1.__code__.co_firstlineno)
    gevent.sleep(1)
    print('参数 < %s > 我醒来了 我在这: '%arg, fuck1.__code__.co_firstlineno)
    return arg

def fuck2( arg ):
    print(fuck2.__code__.co_name , fuck2.__code__.co_firstlineno)
    gevent.sleep(1)
    print(fuck2.__code__.co_name + " done")
    return arg


cor_list = [gevent.spawn(fuck1  ,  arg )  for arg in range(3)]
cor_list1 = [gevent.spawn(fuck2 , arg) for arg in range(3)]
cor_list.extend(cor_list1)
gevent.joinall(cor_list)

看一下同步 和异步的比较:

#用与测试的函数
def job(arg):
    import random
    print(' job start :' ,arg)
    gevent.sleep(random.randint(0,5) * 0.5)        #这个时间可以自己修改看看
    print(' im done')


def sync():
    for i in range(3):
        job(i)


def async():
    alist = [ gevent.spawn(job , arg) for arg in range(3)]
    gevent.joinall(alist)



print("先来同步:")
sync()

print('再来异步:')
async()

还有一些类死于线程的同步对象 . event啦, semaphore啦 ,queue啦. 这些都用于协程之间交互的, 毕竟单线程

event:

# 我看了下, 在windows中是个 CreateEvent 的手动事件 ,即一旦 set , 所有wait的将全部继续运行.
# 附注: windows中有2个事件,一个自动一个手动. 自动的在 WaitForSingleObject后将原子的ResetEvent, 手动的不会.
# 相当于 py中的 Event.wait, Event.clear
# 那个啥, 这行别看了.py中没那么麻烦

from gevent.event import Event
ev = Event()

def request():
    print(' fetching pages' * 10)
    gevent.sleep(2)
    print(' fetching done' * 10)
    ev.set()                    #所有wait的将被全部激活

def response():
    print('response 已启动')
    ev.wait()               #等待 ev.set 后将运行
    print('response 完成')

res_list = [gevent.spawn(response) for i in range(5)] #先创建了5个,他们运行到 ev.wait的时候将全部等待

res_list.append(gevent.spawn_later(2, request))     #这里用了 spawn_later .可以预定几秒后 开始运行

gevent.joinall(res_list)

queue: 多生产多消费
我一开始使用queue 的时候常常会碰到一个异常. LockUp.Exit (forever 之类的) 好像是这个.
主要原因是要么在生产者要么在消费者中一定有一个地方,没让协程退出. 所以在joinall 的时候会产生异常
queue.put / get 都是阻塞操作

from gevent.queue import Queue
q = Queue(3)  # 最多存放3个
def producer():
    for i in range(20):
        print('->>>>>>>> producer put %d'%(i))
        q.put(i)

    print('->' * 20 + ' producer done')



def consumer(arg):
    while True:
        try:
            item = q.get(timeout=0.5)    #设置了timeout ,用于过了0.5秒一旦queue为空则抛异常.结束此循环
            print('consumer %d get %d , queue:%d ' %(arg,item,q.qsize()) )
        except Exception as e:
            break

    print('consumer %d done' % arg)

pro_list = [gevent.spawn(producer) for i in range(5)]  #多个生产者
con_list = [gevent.spawn(consumer,i) for i in range(3)] #多个消费者
con_list.extend(pro_list)
gevent.joinall(con_list)
print(q.empty())

一个失败的例子: 用协程读取文件 . 测试下来速度很慢:

import os
from functools import partial

EACH_SIZE = 1024    #每次读1024

#eachpart : 每块大小, pos : 从哪里开始读取
def pro_readfile(filepath,eachPart,pos):
    with open(filepath,'rt') as fd:
        fd.seek(pos)
        iterbale = iter(partial(fd.read,EACH_SIZE),'')
        for text in iterbale:
            print(text)


path = 'D:/360极速浏览器下载/msdn.txt'
co_size = 5                #协程数量
filesize = os.path.getsize(path)    #文件大小
eachPart = int(filesize/5) +1       #每个协程读多少

be = time.clock()        #开始时间
pro_list = [gevent.spawn(pro_readfile,path,eachPart, i*eachPart) for i in range(co_size)]    
gevent.joinall(pro_list)
end = time.clock()    #结束
print(end-be)

JoinableQueue:

q = JoinableQueue(50)
def doing(arg):
    print('im doing %d' %arg)
    gevent.sleep(1)
    print('im done %d'%arg)
    q.task_done()

def to_do():
    while True:
        func , args= q.get()
        gevent.spawn(func,args)


for i in range(5):
    gevent.spawn(to_do)

for i in range(10):
    q.put((doing,i))


q.join()

最后介绍一下Pool , 会用Pool ,也就会用Group了 . 附:class Pool(Group)

import gevent.monkey; gevent.monkey.patch_all()  #注意导入这个. 如果你的程序涉及了socket


pool = Pool(10) #限制在10个协程
def read_from(url):
    r = requests.get(url)
    r.encoding = r.apparent_encoding
    print(r.url, r.headers)

    return r.status_code
urls = ["https://www.qq.com","https://www.baidu.com","http://www.sina.com.cn"]

pool.spawn(read_from,urls[0])  #产生一个协程
pool.spawn(read_from,urls[1])

#也可以这样

res = pool.imap(read_from, urls)        #跟map 函数类似, 返回一个可迭代
for r in res:
    print(r)

#或者
print('- ' * 50)
for r in pool.imap_unordered(read_from,urls):   #更好的选择.哪个先完成就返回
    print(r)

pool.join()

相关推荐