第13章并发的翻译
摘自《 Expert Python Programming》一书,
第二版
MichałJaworski和TarekZiadé,2016年
异步编程
近年来,异步编程获得了极大的欢迎。 Python 3.5终于有了一些语法功能,这些功能加强了异步解决方案的概念。 但这并不意味着仅从Python 3.5开始就可以进行异步编程。 许多库和框架是在更早的时候提供的,并且大多数起源于旧版本的Python2。甚至还有一种称为Stackless的Python替代实现(请参见第1章,“ Python的当前状态”),该实现着眼于这种单一编程方法。 对于某些解决方案(例如
Twisted,Tornado或
Eventlet) ,活跃的社区仍然存在,并且确实值得了解。 无论如何,从Python 3.5开始,异步编程变得比以往任何时候都容易。 因此,可以预期它的内置异步功能将替代大多数旧工具,或者外部项目将逐渐变成一种基于内置Python的高级框架。
当试图解释什么是异步编程时,最容易将此方法视为类似于线程的方法,但没有系统调度程序。 这意味着异步程序可以同时处理任务,但是其上下文是在内部而不是由系统调度程序切换的。
但是,当然,我们不使用线程对异步程序中的任务进行并行处理。 大多数解决方案使用不同的概念,并且根据实现的不同,它们的名称也不同。 用于描述此类并行程序对象的一些名称示例包括:
- 绿色线程 -绿色线程(greenlet,gevent或eventlet项目)
- 协程-协程(Python 3.5中的纯异步编程)
- Tasklets(无堆栈Python)这些基本上是相同的概念,但是通常以略有不同的方式实现。
出于明显的原因,在本节中,我们将仅关注Python从3.5版开始最初支持的协程。
协作多任务和异步I / O
协作式多任务处理是异步编程的核心。 从这个意义上讲,不需要操作系统中的多任务来启动上下文切换(切换到另一个进程或线程),而是每个进程在处于待机模式时都自动释放控制权,以确保同时执行多个程序。 这就是为什么它被称为协作。 所有进程必须一起工作以确保多任务成功。
多任务模型有时在操作系统中使用,但是现在几乎找不到它作为系统级解决方案。 这是因为存在一项服务设计不当会轻易破坏整个系统稳定性的风险。 当前,使用直接由操作系统控制的上下文切换来调度线程和进程是在系统级别进行并发的主要方法。 但是协作多任务处理在应用程序级别仍然是一个很好的并发工具。
说到应用程序级别的联合多任务,我们不处理需要释放控制权的线程或进程,因为所有执行都包含在一个进程和线程中。 取而代之的是,我们有几个任务(协程,tasklet和绿色线程)将控制权转移到控制任务协调的单个函数中。 此函数通常是一种事件循环。
为了避免混淆(由于Python术语),我们现在将此类并行任务称为协程。 协作多任务处理中最重要的问题是何时转移控制权。 在大多数异步应用程序中,控制在I / O操作期间传递给调度程序或事件循环。 无论程序是从文件系统中读取数据还是通过套接字进行通信,当进程变为非活动状态时,此类I / O操作始终与一定的等待时间相关联。 等待时间取决于外部资源,因此这是释放控制权的好机会,以便其他协程可以执行其工作,直到它们还必须等待这种行为与在Python中实现多线程的行为有点相似为止。 我们知道GIL可以序列化Python线程,但是每个I / O操作都可以释放它。 主要区别在于Python中的线程是作为系统级线程实现的,因此操作系统可以随时卸载当前正在运行的线程并将控制权转移给另一个线程。
在异步编程中,任务永远不会被主事件循环中断。 这就是为什么这种多任务处理样式也称为非优先级多任务处理的原因。
当然,每个Python应用程序都在一个操作系统上运行,在该操作系统上还有其他进程争用资源。 这意味着操作系统始终有权卸载整个过程并将控制权转移给另一个。 但是,当异步应用程序重新启动时,它将从系统调度程序介入时暂停的位置继续。 这就是为什么协程在这种情况下不拥挤的原因。
Python异步和等待关键字
关键字
async和
await是异步Python编程中的主要构建块。
在
def语句之前使用的
async关键字定义了新的协程。 协程函数可以在严格定义的情况下暂停和恢复。 它的语法和行为与生成器非常相似(请参见类级别下的第2章,“语法建议”)。 实际上,应在旧版本的Python中使用生成器来实现协程。 这是使用
async关键字的函数的示例声明:
async def async_hello(): print("hello, world!")
使用
async关键字定义的功能很特殊。 调用它们时,它们不在内部执行代码,而是返回一个协程对象:
>>>> async def async_hello(): ... print("hello, world!") ... >>> async_hello() <coroutine object async_hello at 0x1014129e8>
协程对象不会执行任何操作,直到在事件循环中安排了它的执行为止。 asyncio模块可用于提供事件循环的基本实现,以及许多其他异步实用程序:
>>> import asyncio >>> async def async_hello(): ... print("hello, world!") ... >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(async_hello()) hello, world! >>> loop.close()
自然,仅创建一个简单的协程,在我们的程序中我们不实现并行性。 为了看到真正平行的东西,我们需要创建更多由事件循环执行的任务。
可以通过调用
loop.create_task()方法或通过提供另一个对象来等待使用
asyncio.wait()函数来将新任务添加到循环中。 我们将使用后一种方法,并尝试异步打印使用
range()函数生成的数字序列:
import asyncio async def print_number(number): print(number) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete( asyncio.wait([ print_number(number) for number in range(10) ]) ) loop.close()
asyncio.wait()函数接受协程对象列表并立即返回。 结果是生成器,该生成器生成表示将来结果(未来)的对象。 顾名思义,它用于等待所有提供的协程完成。 它返回生成器而不是协程对象的原因是因为它与Python的早期版本向后兼容,这将在后面解释。 执行此脚本的结果可能如下:
$ python asyncprint.py 0 7 8 3 9 4 1 5 2 6
如我们所见,数字不是按照创建协程的顺序打印的。 但这正是我们想要实现的目标。
Python 3.5中添加的第二个重要关键字是
await 。 它用于等待协程或将来事件的结果(稍后说明),并在事件循环中释放对执行的控制。 为了更好地理解它是如何工作的,我们需要考虑一个更复杂的代码示例。
假设我们要创建两个协程,它们将在循环中执行一些简单的任务:
- 等待随机的秒数
- 打印一些作为参数提供的文本,以及等待所花费的时间。 让我们从一个简单的实现开始,该实现存在一些并发性问题,稍后我们将尝试通过额外使用await来改善这些并发性问题:
import time import random import asyncio async def waiter(name): for _ in range(4): time_to_sleep = random.randint(1, 3) / 4 time.sleep(time_to_sleep) print( "{} waited {} seconds" "".format(name, time_to_sleep) ) async def main(): await asyncio.wait([waiter("foo"), waiter("bar")]) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
在终端中执行时(使用time命令测量时间),您可以看到:
$ time python corowait.py bar waited 0.25 seconds bar waited 0.25 seconds bar waited 0.5 seconds bar waited 0.5 seconds foo waited 0.75 seconds foo waited 0.75 seconds foo waited 0.25 seconds foo waited 0.25 seconds real 0m3.734s user 0m0.153s sys 0m0.028s
如我们所见,这两个协程都完成了执行,但不是异步完成的。 原因是它们都使用
time.sleep()函数,该函数锁定但不释放事件循环中的控件。 这在多线程安装中会更好,但是我们现在不希望使用线程。 那么我们该如何解决呢?
答案是使用
asyncio.sleep() ,它是
time.sleep()的异步版本,并使用
await关键字期望结果。 我们已经在
main()的第一个版本中使用了此语句,但这只是为了提高代码的清晰度。 显然,这并没有使我们的实现更加并行。 让我们看一下使用await asyncio.sleep()的
侍者()协程的改进版本:
async def waiter(name): for _ in range(4): time_to_sleep = random.randint(1, 3) / 4 await asyncio.sleep(time_to_sleep) print( "{} waited {} seconds" "".format(name, time_to_sleep) )
运行更新的脚本,我们将看到两个函数的输出如何相互交替:
$ time python corowait_improved.py bar waited 0.25 seconds foo waited 0.25 seconds bar waited 0.25 seconds foo waited 0.5 seconds foo waited 0.25 seconds bar waited 0.75 seconds foo waited 0.25 seconds bar waited 0.5 seconds real 0m1.953s user 0m0.149s sys 0m0.026s
这种简单改进的另一个好处是代码运行速度更快。 总执行时间少于所有睡眠时间的总和,因为协程可以一一控制。
先前版本的Python中的Asyncio
asyncio模块出现在Python 3.4中。 因此,这是唯一一个在Python 3.5之前就严重支持异步编程的Python版本。 不幸的是,这两个后续版本似乎足以引起兼容性问题。
无论如何,Python中的异步编程核心是在支持此模板的语法元素之前引入的。 迟来总比没有好,但是这造成了使用协同程序使用两种语法的情况。
从Python 3.5开始,您可以使用
async和
await :
async def main (): await asyncio.sleep(0)
但是,在Python 3.4中,您将必须另外应用asyncio.coroutine装饰器并在coroutine文本中产生:
@asyncio.couroutine def main(): yield from asyncio.sleep(0)
另一个有用的事实是,Python 3.3中引入了
yield from语句 ,并且PyPI具有异步反向端口。 这意味着您还可以在Python 3.3中使用这种协作式多任务处理的实现。
异步编程的实际示例
如本章多次提到的那样,异步编程是处理I / O的绝佳工具。 现在是时候创建一些比打印序列或异步等待更实用的东西了。
为了确保一致性,我们将尝试解决在多线程和多处理帮助下解决的相同问题。 因此,我们将尝试通过网络连接从外部资源异步提取一些数据。 如果我们可以使用与前面几节相同的
python-gmaps包,那将是很好的。 不幸的是,我们不能。
python-gmaps的创建者有点懒,只取了名字。 为了简化开发,他选择了请求包作为他的HTTP客户端库。 不幸的是,请求不支持带有
async和
await的异步I / O。 还有其他一些项目旨在为查询项目提供一些并行性,但它们要么依赖于
Gevent (
grequests ,请参见
https://github.com/ kennethreitz / grequests ),要么运行线程/进程池(query-futures)请参阅
github.com/ross/requests-futures )。 它们都不能解决我们的问题。
在责备自己责骂无辜的开源开发人员之前,请冷静下来。 python-gmaps软件包背后的人是我。 依赖项选择不当是该项目的问题之一。 我只想不时公开批评自己。 这对我来说是一个痛苦的教训,因为最新版本的python-gmaps (在撰写本书时为0.3.1)无法轻松地与Python的异步I / O集成。 无论如何,这种情况将来可能会改变,因此不会丢失任何内容。
了解了库的局限性(在前面的示例中是如此易于使用),我们需要创建一些东西来填补这一空白。 Google MapsAPI确实非常易于使用,因此我们将介绍一个异步实用程序,仅供说明。 Python 3.5的标准库仍然缺少可以像调用
urllib.urlopen()一样轻松地执行异步HTTP请求的库。 我们绝对不想从头开始创建完整的协议支持,因此我们将使用PyPI中提供的
aiohttp包中的一些帮助。 这是一个非常有前途的库,它为异步HTTP添加了客户端和服务器实现。 这是在
aiohttp之上构建的一个小模块,它创建一个
geocode()帮助器函数,该函数执行对Google Maps API服务的地理编码请求:
import aiohttp session = aiohttp.ClientSession() async def geocode(place): params = { 'sensor': 'false', 'address': place } async with session.get( 'https://maps.googleapis.com/maps/api/geocode/json', params=params ) as response: result = await response.json() return result['results']
假设此代码存储在名为
asyncgmaps的模块中,我们将在以后使用。 现在,我们准备重写在多线程和多处理讨论中使用的示例。 以前,我们曾经将整个操作分为两个单独的阶段:
- 使用fetch_place()函数并行执行对外部服务的所有请求。
- 使用present_result()函数循环显示所有结果。
但是,由于协作式多任务处理与使用多个进程或线程完全不同,因此我们可以略微更改方法。 每个项目使用单个线程中提出的大多数问题不再是我们关注的问题。
协程不是抢先的,因此我们可以在收到HTTP响应后立即轻松显示结果。 这将简化我们的代码并使之更易于理解:
import asyncio
异步编程非常适合对构建可伸缩应用程序感兴趣的后端开发人员。 实际上,这是用于创建竞争激烈的服务器的最重要工具之一。
但是现实是可悲的。 许多处理I / O问题的流行软件包都不打算与异步代码一起使用。 造成这种情况的主要原因是:
- Python 3的实施仍很低,并具有一些高级功能
- 初学者对Python的各种并发概念了解不足
这意味着,由于架构限制,现有同步多线程应用程序和程序包的迁移常常是不可能的,或者迁移成本太高。 许多项目可以从异步多任务样式的实现中受益匪浅,但最终只有少数项目会这样做。 这意味着现在从一开始就尝试创建异步应用程序将遇到很多困难。 在大多数情况下,这将与“异步编程的实际示例”部分中提到的问题类似-接口不兼容和I / O操作的非同步阻塞。 当然,有时您会在遇到这种不兼容性时放弃等待,而只是同步获取必要的资源。 但这会在等待结果时互相阻止协程执行其代码。 从技术上讲,这行得通,但同时也破坏了异步编程的所有好处。 因此,最终,将异步I / O与同步I / O结合起来不是一个选择。 这是一个全有或全无的游戏。
另一个问题是冗长的处理器绑定操作。 当执行I / O操作时,从协程释放控制没有问题。 从文件系统或套接字进行写入/读取时,您最终将等待,因此使用await进行的调用是您最好的方法。 但是,如果您需要计算一些东西,并且知道需要一些时间,该怎么办? 当然,您可以将问题分为几部分,并在每次稍加推进时取消控制。 但是很快您会发现这不是一个很好的模型。 这样的事情会使代码混乱,也不能保证良好的结果。
时间绑定应由解释器或操作系统负责。
将异步代码与异步期货相结合
因此,如果您具有执行长时间同步I / O的代码而无法执行或不想重写该怎么办。 还是必须在主要为异步I / O设计的应用程序中执行一些繁重的处理器操作时怎么办? 好吧...您需要找到解决方法。 我所说的是多线程或多处理。
这听起来可能不太好,但是有时最好的解决方案可能是我们试图摆脱的解决方案。 由于采用了多重处理,因此在Python中对资源密集型任务的并行处理始终可以更好地执行。 多线程可以异步地处理I / O操作(迅速且没有大量资源),就像异步处理一样,并且如果配置正确且谨慎处理,则可以等待。
因此,有时候,当某些东西不适合您的异步应用程序时,您不知道该怎么做时,请使用一段代码将其放在单独的线程或进程中。 您可以假装它是协程,释放事件循环的控制权,并在准备好结果时最终对其进行处理。
对我们来说幸运的是,Python标准库提供了
current.futures模块,该模块也与
asyncio模块集成在一起。 这两个模块一起使您可以计划在线程或其他进程中运行的阻塞函数,就好像它们是异步非阻塞协程。
执行人和期货
在看到如何在异步事件循环中嵌入线程或进程之前,我们将仔细研究
并发模块。将来,该模块将成为我们所谓的解决方法的主要组成部分。
parallel.futures模块中最重要的类是
Executor和
Future 。
执行程序是可以并行处理工作项的资源池。 它的目的似乎与多处理器模块中的类
Pool和
dummy.Pool十分相似,但是它具有完全不同的接口和语义。 这是不打算实现的基类,具有两个特定的实现:
- ThreadPoolExecutor :代表线程池
- ProcessPoolExecutor :代表进程池
每个
执行者提出三种方法:
- Submit(fn,* args,** kwargs) :调度fn函数在资源池中执行,并返回一个Future对象,该对象代表被调用对象的执行
- map(func,* iterables,timeout = None,chunksize = 1) : func函数在迭代上执行,类似于多处理。 Pool.map()方法
- shutdown(wait = True) :这将关闭执行器并释放其所有资源。
最有趣的方法是
Submit(),因为它返回了Future对象。 它表示被调用的异步执行,并且仅间接表示其结果。 若要获取调度的被调用对象的实际返回值,必须调用
Future.result()方法。 如果被调用的对象已经完成,则
result()方法将不会阻塞该对象,而只会返回该函数的输出。 如果不是,他将阻止它,直到结果准备好为止。 将其视为对结果的承诺(实际上与JavaScript中的承诺是同一概念)。 您不需要在收到它后立即解压缩它(使用
result()方法),但是如果您尝试这样做,则可以保证最终返回一些信息:
>>> def loudy_return(): ... print("processing") ... return 42 ... >>> from concurrent.futures import ThreadPoolExecutor >>> with ThreadPoolExecutor(1) as executor: ... future = executor.submit(loudy_return) ... processing >>> future <Future at 0x33cbf98 state=finished returned int> >>> future.result() 42
如果要使用
Executor.map()方法,则其
用法与多处理器模块的
Pool类的
Pool.map()方法没有什么不同:
def main(): with ThreadPoolExecutor(POOL_SIZE) as pool: results = pool.map(fetch_place, PLACES) for result in results: present_result(result)
在事件循环中使用执行器
从概念上讲,
Executor.submit()方法返回的Future类的实例与异步编程中使用的协程非常接近。 这就是为什么我们可以使用美工来创建协作多任务与多处理或多线程之间的混合体的原因。
此解决方法的核心是
事件循环类的
BaseEventLoop.run_in_executor(执行程序,func,* args)方法。 这使您可以计划在executor参数表示的进程或线程池中执行func函数。 此方法最重要的是,它返回新的期望对象(可以使用await运算符期望的对象)。 因此,由于这个原因,您可以执行与协程完全不同的协程而不是协程,并且无论完成多长时间,它都不会阻塞。 它将仅停止期望从此类调用获得结果的函数,但整个事件周期将继续。
一个有用的事实是,您甚至不需要创建自己的执行者实例。 如果将
None作为参数传递给
executor ,则
ThreadPoolExecutor类将与默认线程数一起使用(对于Python 3.5,这是处理器数乘以5)。
因此,让我们假设我们不想重写python-gmaps包中引起我们头痛的问题部分。
我们可以通过调用loop.run_in_executor()轻松地将阻塞调用推迟到单独的线程,同时将fetch_place()函数保留为预期的协程: async def fetch_place(place): coro = loop.run_in_executor(None, api.geocode, place) result = await coro return result[0]
这样的解决方案比拥有一个完全异步的库来完成这项工作更糟糕,但是您知道至少有些事情总比没有好。在解释了真正的并发性之后,我们采取了行动并使用多线程分析了典型的并行问题之一。在确定了代码的主要缺陷并纠正了这些缺陷之后,我们转向多处理,以了解在我们的案例中它如何工作。之后,我们发现使用多处理器模块,使用多个进程比使用多线程的基本线程容易得多。但是只有在那之后,我们才意识到可以通过multiprocessing.dummy将相同的API与线程一起使用。因此,现在在多处理和多线程之间进行选择仅取决于最适合该问题的解决方案,而不取决于哪个接口具有最佳的解决方案。说到解决问题,我们最终尝试了异步编程,该编程应该是I / O相关应用程序的最佳解决方案,只是要了解我们不能完全忘记线程和进程。因此,我们绕了一圈,回到了起点!这使我们得出本章的最终结论。没有适合所有人的解决方案。您可能会喜欢或喜欢几种方法。有一些方法更适合解决此类问题,但是您需要了解所有方法才能成功。在现实的情况下,您可以在一个应用程序中使用全部工具和并行样式,这并不罕见。上一个结论是对下一章第14章“有用的设计模式”的主题的出色介绍。由于没有单一的模板可以解决您的所有问题。您应该尽可能多地了解,因为最终您将每天使用它们。