说明

Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。

从Python3.2开始,标准库为我们提供了concurrent.futures模块,concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和

ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调

用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。

Python 3.4 以后标准库中asyncio 包,这个包使用事件循环驱动的协程实现并发。这是 Python 中最大也

是最具雄心壮志的库之一。asyncio 大量使用 yield from 表达式,因此与

Python 旧版不兼容。

submit和map方法

submit方法作用是向线程池提交可回调的task,并返回一个回调实例。

example:

import time
from concurrent.futures import ThreadPoolExecutor

# 可回调的task
def pub_task(msg):
  time.sleep(3)
  return msg

# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=3)

# 往线程池加入2个task
task1 = pool.submit(pub_task, 'a')
task2 = pool.submit(pub_task, 'b')

print(task1.done())    # False
time.sleep(4)
print(task2.done())    # True

print(task1.result())
print(task2.result())

map方法是创建一个迭代器,回调的结果有序放在迭代器中。

问题:

Executor.map 函数易于使用,不过有个特性可能有用,也可能没用,具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一致。

如果第一个调用生成结果用时 10秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。

在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

如果必须等到获取所有结果后再处理,这种行为没问题;不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。

为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。

from concurrent.futures import ThreadPoolExecutor
import requests

URLS = ['http://www.csdn.com', 'http://qq.com', 'http://www.leasonlove.cn']

def task(url, timeout=10):
  return requests.get(url, timeout=timeout)

pool = ThreadPoolExecutor(max_workers=3)
results = pool.map(task, URLS)

for ret in results:
  print('%s, %s' % (ret.url, ret))

future异步编程

Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import requests

URLS = ['http://www.csdn.cn', 'http://qq.com', 'http://www.leasonlove.cn']

def task(url, timeout=1):
  return requests.get(url, timeout=timeout)

with ThreadPoolExecutor(max_workers=3) as executor:
  future_tasks = [executor.submit(task, url) for url in URLS]

  for f in future_tasks:
    if f.running():
      print('%s is running' % str(f))

  for f in as_completed(future_tasks):
    try:
      ret = f.done()
      if ret:
        f_ret = f.result()
        print('%s, done, result: %s, %s' % (str(f), f_ret.url, f_ret.content))
    except Exception as e:
      # 第一个url无响应
      f.cancel()
      print(str(e))

asyncio库协程实现并发

对于gevent 和 asyncio 建议大家放弃Gevent,拥抱asyncio,asyncio是Python3.4以后标准库。

而且由于Gevent直接修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。

但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题。

import asyncio
import time
start = time.time()

async def do(x):
  print('Waiting: ', x)
  await asyncio.sleep(x)
  return 'Finish after {}s'.format(x)

task1 = do(1)
task2 = do(2)
task3 = do(4)

tasks = [
  asyncio.ensure_future(task1),
  asyncio.ensure_future(task2),
  asyncio.ensure_future(task3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
  print('Task result: ', task.result())

end = time.time()
print('TIME: ', end - start)

协程与线程

如果使用线程做过重要的编程,你就知道写出程序有多么困难,因为调度程序任何时候都能中断线程。

必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。

而协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。

对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。

想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。

这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield处取消,因此可以处理 CancelledError 异常,执行清理操作。

补充知识:Python-什么时候使用yield?

简介

很多时候在python代码中见到了yield,没有系统学习过,自己也没有用过。

yield语句延迟了语句的执行,然后发送了一个值给调用者,但保留了一定的状态去保证函数离开之后可以继续。当继续的时候,函数继续执行上一个的运行状态。这使得它的代码可以随着时间产生一系列的值,而不是立即执行,然后像一个list一样发送他们回来。

例子

例子1:

# A Simple Python program to demonstrate working 
# of yield 
 
# A generator function that yields 1 for first time, 
# 2 second time and 3 third time 
def simpleGeneratorFun(): 
  yield 1
  yield 2
  yield 3
 
# Driver code to check above generator function 
for value in simpleGeneratorFun(): 
  print(value) 

返回语句发送一个特殊的值给它的调用者,而yield产生了一系列的值,当我们想要遍历一个序列的时候,我们应该使用yield,但不想要把整个序列存储在内存中。

yield用于python的生成器(generator)。一个genertator 被定义得看起来像一个普通函数一样,但它需要产生一个数字得时候,它使用yield,而不是使用return。如果一个函数里面定义了yield,那么它自动称为了一个generator函数。、

例子2:

# A Python program to generate squares from 1 
# to 100 using yield and therefore generator 
 
# An infinite generator function that prints 
# next square number. It starts with 1 
def nextSquare(): 
  i = 1; 
 
  # An Infinite loop to generate squares 
  while True: 
    yield i*i         
    i += 1 # Next execution resumes 
        # from this point   
 
# Driver code to test above generator 
# function 
for num in nextSquare(): 
  if num > 100: 
     break  
  print(num) 

输出1,4,9…100

以上这篇Python并发concurrent.futures和asyncio实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

标签:
Python,concurrent,futures,asyncio

免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
评论“Python并发concurrent.futures和asyncio实例”
暂无“Python并发concurrent.futures和asyncio实例”评论...

《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线

暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。

艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。

《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。