AI 科技评论按,作为目前最流行的编程语言之一,python 在人工智能相关的领域备受青睐。在编码时,代码的运行时间是我们需要考虑的重要因素之一。如何加快程序运行的速度?这是很多开发者经常会思考的问题。
工程师 Jim Anderson 分享了他的经验,他写了一篇关于「通过并发性加快 python 程序的速度」的文章。Jim 有多年的编程经验,并且使用过各种编程语言。他曾做过嵌入式系统相关的工作,开发过分布式系统,并且参加过许多会议。
如果你听过很多关于 asyncio 被添加到 python 的讨论,但是好奇它与其他并发方法相比怎么样,或者你很好奇什么是并发,以及它如何加速你的程序,那么你需要看下这篇文章。
在本文中,你将了解以下内容:
什么是并发?
什么是并行?
一些 python 并发方法的比较,包括线程、异步和多进程
在程序中何时使用并发性以及使用哪个模块
本文假设读者对 python 有一个基本的了解,并且使用 python3.6 及以上版来运行示例。你可以从 Real python GitHub repo 下载示例。https://github.com/realpython/materials/tree/master/concurrency-overview
什么是并发?
并发这个词在字典里面定义是「同时发生」。在 python 中,同时发生的事情由线程、任务、进程调用,但在高层,它们都是指按顺序运行的一系列指令。
我喜欢把它们看作是不同的思维方式。它们都可以在特定的点上停止,此时,正在处理它们的 CPU 或大脑可以切换到其它的点上。每件事的状态都会被保存,这样它就可以在中断的地方重新启动。
你可能想知道为什么 python 对相同的概念使用不同的词。事实证明,只有从宏观意义上看线程、任务和进程时,它们才是相同的。一旦你开始深入了解细节,它们都代表着一些稍微不同的东西。随着示例的不断深入,你将看到更多不同之处。
你必须小心谨慎,因为当你深入到细节的时候,实际上只有多进程在同一时间运行着多个任务。线程和异步都在单个处理器上运行,因此一次只能处理一个任务。他们只是聪明地找到方法轮流加速整个过程。即使它们不同时运行不同的程序,我们仍然称之为并发。
线程或任务轮流执行的方式是线程和异步之间的巨大区别。在线程中,操作系统实际上知道每个线程,并且可以随时中断它以开始运行不同的线程。这被称为先占式多工法(pre-emptive multitasking),因为操作系统可以对线程进行切换。
先占式多工法(pre-emptive multitasking)很方便,因为线程中的代码不需要做任何事情来进行切换。但它也是困难的,因为「在任何时候」都可能需要进行任务切换。这种转换可以发生在单个 python 语句的中间,甚至是像 x=x+1 这样的简单语句。
另一方面,asyncio 使用协同多任务处理。这些任务必须通过宣布它们何时准备好被关闭来协同合作。这意味着要实现这一点,任务中的代码必须稍微更改才能实现这一点。
提前做这些额外的工作的好处是,你总是知道你的任务将在哪里被切换。除非该语句被标记,否则任务不会在 python 语句的中间被切换。接下来你将看到如何简化设计的各个部分。
什么是并行?
到目前为止,你已经研究了在单个处理器上发生的并发。那么对于你的新笔记本电脑上的那么多 CPU 核会怎么样呢?你如何利用它们?答案就是多进程。
通过多进程,python 创建了新的进程。这里的一个进程可以被看作是一个完全不同的程序,尽管从技术上讲,它们通常被定义为一个资源的集合,其中的资源包括内存、文件句柄和类似的东西。每个进程都在自己的 python 解释器中运行。
因为它们是不同的进程,所以在多进程中的每一个进程都可以在不同的核上运行。在不同的核心上运行意味着它们实际上可以同时运行,这太棒了。这样做会产生一些复杂的情况,但是在大多数情况下,python 都能很好地平衡它们。
并发何时有用?
并发性可以对两种类型的问题产生很大的影响。这通常称为 CPU 绑定和 I/O 绑定。
I/O 绑定问题会导致程序运行速度减慢,因为它常常需要等待来自某些外部资源的输入/输出(I/O)。当你的程序处理比你的 CPU 慢得多的东西时,这种情况经常发生。
比你的 CPU 慢的事情很多,但谢天谢地,它们中间的大多数都不会与你的程序有关联。你的程序最常与之交互的缓慢的事情是文件系统和网络连接。
让我们看看它们是什么样子的:
在上面的示意图中,蓝色框显示程序工作的时间,红色框显示等待 I/O 操作完成的时间。这个图并不是按比例绘制的,因为 Internet 上的请求时间可能比 CPU 指令长几个数量级,所以你的程序最终可能会花费大部分时间等待操作完成。这是你的浏览器在大多数时间里所做的事情。
另一方面,有一些程序可以在不与网络通信或不访问文件的情况下进行重要的计算。这些是 CPU 绑定的程序,因为限制程序速度的资源是 CPU,而不是网络或文件系统。
以下是 CPU 绑定程序的示意图:
当你完成下面部分中的示例时,你将看到不同形式的并发在 CPU 绑定的程序和 I/O 绑定的程序中工作得更好或更差。向程序添加并发性会增加额外的代码,增大复杂性,因此你需要确定潜在的加速是否值得付出这些代价。看完本文,你应该有足够的信息来开始做这个决定。
接下来,我们将对一些 python 并发方法进行比较,包括线程、异步和多进程,在程序中何时使用并发性以及使用哪个模块。
当然,本文假设读者对 python 有一个基本的了解,并且使用 python3.6 及以上版来运行示例。
如何加速 I/O 绑定程序
让我们从关注 I/O 绑定程序和一个常见问题开始:通过网络下载内容。在我们的例子中,你将从一些站点下载网页,但这个过程可能会产生任何故障。它只是更容易可视化。
同步版本
我们将从这个任务的非并发版本开始。注意,这个程序需要请求模块。在运行这个程序之前,你需要运行 pip 安装请求,这可能需要使用 virtualenv 命令。此版本根本不使用并发:
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
如你所见,这是一个相当短的程序。download_site()可以从 URL 下载内容并打印它的大小。要指出的一个小问题是,我们正在使用来自 Session 的会话对象。
直接从 requests 中使用 get(),但创建一个 Session 对象允许 requests 执行一些花哨的网络技巧从而真正加快速度是可能的。
download_all_sites()创建 Session,然后浏览站点列表,依次下载每个站点。最后,它打印出这个过程花费了多长时间,这样你就可以满意地看到在下面的示例中并发性对我们有多大帮助。
这个程序的处理图看起来很像上一节中的 I/O 绑定图。
注意:网络流量取决于许多因素,这些因素可能在每秒都在变化。我已经看到由于网络问题,这些测试案例从一次运行跳转到另一次的时间加倍了。
为什么同步版本很重要
这个版本的代码最棒的特点是,它很简单,编写和调试相对容易。代码的思路更加直接,所以你可以预测它将如何运作。
同步版本的问题
和我们提供的其他解决方案相比,同步版本最大的问题是,它的速度相对较慢。以下是我的机器上的最终输出示例:
注意:你得到的结果可能会和上面有很大差异。运行这个脚本时,需要的时间从 14.2 秒到 21.9 秒不等。在本文中,时间取三次运行中最快的一次所花的时间,在这种情况下,两种方法之间的差异仍然很明显。
然而,运行速度变慢并不总是一个大问题。如果你正在运行的程序使用同步版本运行只需要 2 秒,并且很少运行,那么可能不需要添加并发性。
如果你的程序经常运行怎么办?如果运行程序需要几个小时怎么办?让我们继续使用线程重写这个程序以实现并发性。
线程版本
正如你可能猜测的那样,编写线程程序需要付出更多的努力。然而,对于简单的案例,你可能会惊讶于它所花费的额外努力是如此之少。下面是同一个程序的线程版本:
import concurrent.futures
import requests
import threading
import time
thread_local = threading.local()
def get_session():
if not getattr(thread_local, "session", None):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
当你添加线程时,整体结构是相同的,因此你只需要做一些更改。download_all_sites()从在每个站点调用一次函数改为更复杂的结构。
在这个版本中,你正在创建一个 ThreadPoolExecutor,这看起来很复杂。我们可以把它分解为:ThreadPoolExecutor=thread+pool+executor。
这个对象将创建一个线程池,每个线程都可以并发运行。最后,执行器会控制池中每个线程的运行方式和运行时间。请求将在池中执行。
标准库将 ThreadPoolExecutor 实现为上下文管理器,这样你就可以使用 with 语法来管理线程池的创建和释放。
一旦有了 ThreadPoolExecutor,就可以很方便地使用它的.map()方法。此方法在列表中的每个站点上运行传入函数。最重要的是,它使用所管理的线程池自动并发地运行它们。
那些学习其他语言,甚至是 python 2 的用户可能想知道,在处理线程时,通常用来管理细节的对象和函数在哪里,比如 thread.start()、thread.join()和 queue。
这些仍然存在,你可以使用它们来实现对线程运行方式的细粒度控制。但是,从 python3.2 开始,标准库添加了一个执行器,如果不需要细粒度的控制,它可以为你管理许多细节。
我们的示例中另一个有趣的变化是,每个线程都需要创建自己的 requests.session()对象。当你查看请求文档时,不一定很容易分辨出来,但是读到这个问题(https://github.com/requests/requests/issues/2766)时,你似乎很清楚每个线程需要单独的 Session。
这是线程处理的一个有趣又困难的问题之一。因为操作系统可以控制一个任务何时被中断及另一个任务何时开始,所以在线程之间共享的任何数据都需要受到保护,保证线程安全。很遗憾,requests.session()不是线程安全的。
根据数据是什么以及如何使用它,有几种策略可以使数据访问线程安全。其中之一是使用线程安全的数据结构,如 python 队列模块中的 queue。
另一种策略是线程本地存储。Threading.local() 创建一个看起来像全局的对象,但它对于每个线程来说是不一样的。在你的示例中,这是通过 threadLocal 和 get_session()完成的:
threadLocal = threading.local()def get_session():
if getattr(threadLocal, "session", None) is None:
threadLocal.session = requests.Session()
return threadLocal.session
ThreadLocal 是在线程模块中专门解决这个问题的。看起来有点奇怪,但你只想创建这些对象中的一个,而不是为每个线程创建一个对象。对象本身负责分离不同线程对不同数据的访问过程。
当调用 get_session()时,它查找的 session 和它运行的特定线程是对应的。因此,每个线程在第一次调用 get_session()时将创建一个会话,然后后续在其整个生命周期内简单地调用该会话。
最后,一个关于选择线程数的简短说明。你可以看到示例代码使用了 5 个线程。你可以随意调整这个数字的大小,看看总的时间是如何变化的。你可能认为每次下载只有一个线程是最快的,但实际上不是这样,至少在我的系统中不是这样。我发现,线程数目在 5 到 10 个之间时,速度是最快的。如果超过这个值,那么创建和销毁线程所产生的额外开销将抵消任何节省时间所带来的好处。
这里的难点在于,正确的线程数不是从一个任务到另一个任务中的常量。需要进行一些实验才能得到结果。
为什么线程版本很重要
它很快!这里是我测试中最快的一次。记住,非并发版本需要 14 秒以上的时间:
它的执行时序图如下所示:
它使用多个线程同时向网站发出多个打开的请求,允许你的程序重叠等待时间并更快地获得最终结果!
线程版本的问题
正如你从示例中看到的,要实现这一点需要更多的代码,而且你真的需要考虑在线程之间需要共享哪些数据。
线程可以以巧妙且难以检测的方式进行交互。这些交互可能导致随机的、间歇性的错误,且这些错误很难找到。
异步(asyncio)版本
在你开始检查异步版本示例代码之前,让我们详细讨论一下异步的工作原理。
异步基础
这将是 asycio 的简化版本。这里有许多细节被掩盖了,但它仍然说明了它是如何工作的。
asyncio 的一般概念是,一个被称为事件循环的 python 对象控制每个任务的运行方式和时间。这个对象清楚地知道每个任务处于什么状态。实际上,任务可以处于许多状态,但现在让我们设想一个简化的事件循环,它只有两个状态。
就绪状态指的是任务有工作要做并且准备运行,而等待状态意味着任务正在等待一些外部事情完成,例如网络操作。简化的事件循环维护两个任务列表,分别对应这两个状态。它选择一个已经就绪的任务,然后重新开始运行。该任务处于完全控制状态,直到它将控件送回事件循环。
当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表,然后遍历等待列表中的每个任务,以查看完成 I/O 操作后该任务是否已就绪。它知道就绪列表中的任务仍然是就绪状态,因为它们尚未运行。
一旦所有的任务都被重新排序到正确的列表中,事件循环就会选择下一个要运行的任务。简化的事件循环选择等待时间最长的任务并运行该任务。此过程重复,直到事件循环完成。
asyncio 的一个重要点是,如果不是有意为之,任务永远不会放弃控制。任务在执行的过程中从不会被打断。这使得我们在异步中比在线程中更容易进行资源共享。你不需要担心线程安全问题。
async 和 await
现在让我们来谈谈添加到 python 中的两个新关键字:async 和 await。根据上面的讨论,你可以将 await 视为允许任务将控制权交回事件循环的一种魔力。当你的代码等待函数调用时,await 是一个信号,表明调用可能需要花费一段时间,并且任务应该放弃控制。
最简单的方法是将 async 看作是 python 的标志,告诉它将使用 await 定义函数。在有些情况下,这不是完全正确的,比如异步生成器,但它适用于许多情况,并在开始时为你提供一个简单的模型。
你将在下一个代码中看到的一个例外是 async with 语句,它通常从你的等待的对象创建一个上下文管理器。虽然语义有点不同,但其思想是相同的:将这个上下文管理器标记为可以替换的东西。
我确信你可以想象到,在管理事件循环和任务之间的交互时有一些复杂性。对于以 asyncio 开始的开发人员来说,这些细节并不重要,但是你需要记住,任何调用 await 的函数都需要标记为 async。否则将出现语法错误。
回到代码
既然你已经基本了解了什么是 asyncio,那么让我们浏览一下示例代码的 asyncio 版本,并了解它是如何工作的。请注意,此版本添加了 aiohtp。在运行它之前,应该先运行 pip install aiohtp:
import asyncio
import time
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print("Read {0} from {1}".format(response.content_length, url))
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
这个版本比前两个版本要复杂一些。它有一个类似的结构,但是启动任务的工作量比创建线程池执行器的工作量要多一些。让我们从示例的顶部开始。
download_site()
顶部的 download_site()与线程版本几乎相同,但函数定义行上的 async 关键字和实际调用 session.get()时的 async with 关键字除外。稍后你将看到为什么可以在这里传递 session,而不是使用线程本地存储。
download_all_sites()
download_all_sites() 中可以看到线程示例中最大的变化。
你可以在所有任务之间共享会话,因此该会话在此处创建为上下文管理器。任务可以共享会话,因为它们都在同一线程上运行。会话处于错误状态时,一个任务无法中断另一个任务。
在该上下文管理器中,它使用 asyncio.secure_future()创建一个任务列表,该列表还负责启动它们。创建所有任务后,此函数使用 asyncio.gather()完成会话内容的变动,直到所有任务完成。
线程代码的作用与此类似,但在 ThreadPoolExecutor 中可以方便地处理细节。当前没有 asyncioPoolExecutor 类。
然而,这里的细节中隐藏着一个小而重要的变化。还记得之前我们讨论过要创建的线程数吗?在线程示例中,线程的最佳数量并不明显。
asyncio 的一个很酷的优点是它的规模远远优于线程。与线程相比,每项任务创建所需的资源和时间要少得多,因此创建和运行更多的资源和时间能很好地工作。这个例子只是为每个要下载的站点创建一个单独的任务,这个任务运行得很好。
__main__
最后,异步的本质意味着你必须启动事件循环,并告诉它要运行哪些任务。文件底部的__main__部分包含 get_event_loop() 的代码,然后运行 run_until_complete()。如果没有别的,他们在命名这些函数方面做得很好。
如果你已经更新到 python 3.7,那么 python 核心开发人员会为你简化这种语法。不需要分辨那种情况下使用 asyncio.get_event_loop(),那种情况下使用 run_until_complete(),你只需使用 asyncio.run()。
为什么 asyncio 版本很重要
它真的很快!在我的机器上进行的所有测试中,这是代码运行最快的版本:
执行时序图与线程示例中所发生的情况非常相似。只是 I/O 请求都是由同一线程完成的:
缺少线程池执行器,使得这段代码比线程示例要复杂一些。在这种情况下,你需要做一些额外的工作来获得更好的性能。
还有一个常见的论点是,在合适的位置添加 async 和 await 是一个复杂的问题。在某种程度上,这是事实。这个论点的另一个方面是,它迫使你思考何时交换给定的任务,这可以帮助你设计出一份更好、更快的代码。
规模问题在这里也很突出。为每个站点运行上面的线程示例明显比用少量线程运行它慢。运行带有数百个任务的 asyncio 示例并没有减慢速度。
asyncio 版本的问题
现在 asyncio 有几个问题。为了充分利用 asyncio,你需要特殊的 asyncio 版本的库。如果你只是使用下载站点的请求,那么速度会慢得多,因为请求不是用来通知事件循环它被阻塞了。随着时间的推移,这个问题越来越少,因为越来越多的库采用 asyncio。
另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务的所有优势都会消失。代码中的一个小错误会导致一个任务运行,并长时间占用处理器,从而使其他需要运行的任务处于等待状态。如果任务没有将控制权交还给事件循环,则无法中断事件循环。考虑到这一点,让我们来看看一种完全不同的并发、多处理方法。
多处理器版本
与前面的方法不同,多处理器版本的代码充分利用了新计算机的多个 CPU。让我们从代码开始:
import requests
import multiprocessing
import time
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}:Read {len(response.content)} from {url}")
def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
这比 asyncio 示例短得多,实际上,它看起来与线程示例非常相似,但是在我们深入研究代码之前,让我们快速了解一下多处理器对你会有什么帮助。
简述多处理器
到目前为止,本文中的所有并发示例都只在计算机的单个 CPU 或核上运行。其原因与当前的 cpython 的设计以及所谓的全局解释器锁(globalinterpretorlock,简称 gil)有关。
标准库中的多处理器设计正是为了改变这种状态而设计的,它使你能在多个 CPU 上运行代码。在高层,它是通过创建一个新的 python 解释器实例在每个 CPU 上运行,然后释放出程序的一部分来实现的。
在当前的 python 解释器中启动一个新线程的速度不如单独启动一个 python 解释器的速度快。这是一个重要的操作,存在一些限制和困难,但对某些问题来说,它可以产生巨大的差异。
多处理器代码
代码与我们的同步版本相比有一些小的变化。第一个区别位于 download_all_sites()中。它不是简单地重复调用 download_site(),而是创建一个 multiprocessing.pool 对象,并让它将 download_site 映射到不可访问的站点。和线程示例相比,这点比较相似。
这里所发生的是,池(pool)创建了许多单独的 python 解释器进程,并让每个进程在某些项上运行指定的函数,在我们的例子中是在站点列表上运行指定的函数。主进程和其他进程之间的通信由多处理模块为你处理。
创造池的那条线值得你注意。首先,它不指定要在池中创建多少进程,尽管这是一个可选参数。默认情况下,multiprocessing.pool()将确定计算机中的 CPU 数量并与之匹配。这通常是最好的答案,在我们的例子中也是如此。
对于这个问题,增加进程的数量并不能提高速度。相反,它实际上会降低速度,因为启动和删除所有这些进程的成本大于并行执行 I/O 请求的好处。
接下来,我们得到该调用的 initializer=set_global_session 部分。请记住,池中的每个进程都有自己的内存空间,这意味着它们不能共享会话对象之类的东西。你不会希望每次调用函数时都创建新会话,而是希望为每个进程创建一个会话。
初始化功能参数就是为这种情况而生成的。无法将返回值从初始值设定项传递回由进程 download_site()调用的函数,但可以初始化全局会话变量以保存每个进程的单个会话。因为每个进程都有自己的内存空间,所以每个进程的全局空间都不同。
这就是所有要说的啦,其余的代码与你以前看到的非常相似。
为什么多处理器版本很重要
这个例子的多处理版本非常好,因为它相对容易启动,并且只需要很少的额外代码。它还充分利用了计算机中的 CPU 资源。此代码的执行时序图如下所示:
多处理器版本的问题
这个版本的示例确实需要一些额外的设置,而且全局会话对象很奇怪。你必须花费一些时间来考虑在每个流程中访问哪些变量。
最后,它明显比本例中的异步和线程版本慢:
这并不奇怪,因为 I/O 绑定问题并不是多处理存在的真正原因。在进入下一节并查看 CPU 绑定示例时,你将看到更多内容。
下一篇《如何利用并发性加速你的python程序:CPU 绑定程序加速》敬请期待