Python中的协程,到底是怎么回事?

一.传统的Sync语法请求例子

还是一样, 在了解Async语法的实现之前, 先从一个Sync的语法例子开始, 现在假设有一个HTTP请求, 这个程序会通过这个请求获取对应的响应内容, 并打印出来, 代码如下:

成都创新互联自2013年起,先为陆丰等服务建站,陆丰等地企业,进行企业商务咨询服务。为陆丰企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

import socket
def request(host: str) -> None:
"""模拟请求并打印响应体"""
url: str = f"http://{host}"
sock: socket.SocketType = socket.socket()
sock.connect((host, 80))
sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
response_bytes: bytes = b""
chunk: bytes = sock.recv(4096)
while chunk:
response_bytes += chunk
chunk = sock.recv(4096)
print("\n".join([i for i in response_bytes.decode().split("\r\n")]))
if __name__ == "__main__":
request("so1n.me")

运行程序, 程序能够正常输出, 上部分打印了对应的HTTP响应Header, 下部分打印了HTTP响应体, , 可以看到服务端叫我们以https的形式重新请求, 输出结果如下:

HTTP/1.1 301 Moved Permanently
Server: GitHub.com
Content-Type: text/html
Location: https://so1n.me/
X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50
Content-Length: 162
Accept-Ranges: bytes
Date: Mon, 08 Nov 2021 08:11:37 GMT
Via: 1.1 varnish
Age: 104
Connection: close
X-Served-By: cache-qpg1272-QPG
X-Cache: HIT
X-Cache-Hits: 2
X-Timer: S1636359097.026094,VS0,VE0
Vary: Accept-Encoding
X-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e

301 Moved Permanently

301 Moved Permanently



nginx


不过这里并不是想说HTTP请求是如何实现的, 具体我也不太了解, 在这个代码中, socket的默认调用是阻塞的, 当线程调用connect或者recv时(send是不用等待的, 但在高并发下需要先等待drain后才可以send, 小demo不需要用到drain方法), 程序将会暂停直到操作完成。当一次要下载很多网页的话, 这将会如上篇文章所说的一样, 大部分的等待时间都花在io上面, cpu却一直空闲时, 而使用线程池虽然可以解决这个问题, 但是开销是很大的, 同时操作系统往往会限制一个进程,用户或者机器可以使用的线程数, 而协程却没有这些限制, 占用的资源少, 也没有系统限制瓶颈。

二.异步的请求

异步可以让一个单独的线程处理并发的操作, 不过在上面已经说过了, socket是默认阻塞的, 所以需要把socket设置为非阻塞的, socket提供了setblocking这个方法供开发者选择是否阻塞, 在设置了非阻塞后, connect和recv方法也要进行更改。

由于没有了阻塞, 程序在调用了connect后会马上返回, 只不过Python的底层是C, 这段代码在C中调用非阻塞的socket.connect后会抛出一个异常, 我们需要捕获它, 就像这样:

import socket
sock: socket.SocketType = socket.socket()
sock.setblocking(Flase)
try:
sock.connect(("so1n.me", 80))
except BlockingIOError:
pass

经过一顿操作后, 就开始申请建立连接了, 但是我们还不知道连接啥时候完成建立, 由于连接没建立时调用send会报错, 所以可以一直轮询调用send直到没报错就认为是成功(真实代码需要加超时):

while True:
try:
sock.send(request)
break
except OSError as e:
pass

但是这样让CPU空转太浪费性能了, 而且期间还不能做别的事情, 就像我们点外卖后一直打电话过去问饭菜做好了没有, 十分浪费电话费用, 要是饭菜做完了就打电话告诉我们, 那就只产生了一笔费用, 非常的省钱(正常情况下也是这样子)。这时就需要事件循环登场了,在类UNIX中, 有一个叫select的功能, 它可以等待事件发生后再调用监听的函数, 不过一开始的实现性能不是很好, 在Linux上被epoll取代, 不过接口是类似的, 所在在Python中把这几个不同的事件循环都封装在selectors库中, 同时可以通过DefaultSelector从系统中挑出最好的类select函数。这里先暂时不说事件循环的原理, 事件循环最主要的是他名字的两部分, 一个是事件, 一个是循环, 在Python中, 可以通过如下方法把事件注册到事件循环中:

def demo(): pass
selector.register(fd, EVENT_WRITE, demo)

这样这个事件循环就会监听对应的文件描述符fd, 当这个文件描述符触发写入事件(EVENT_WRITE)时,事件循环就会告诉我们可以去调用注册的函数demo。不过如果把上面的代码都改为这种方法去运行的话就会发现, 程序好像没跑就结束了, 但程序其实是有跑的, 只不过他们是完成的了注册, 然后就等待开发者接收事件循环的事件进行下一步的操作, 所以我们只需要在代码的最后面写上如下代码:

while True:
for key, mask in selector.select():
key.data()

这样程序就会一直运行, 当捕获到事件的时候, 就会通过for循环告诉我们, 其中key.data是我们注册的回调函数, 当事件发生时, 就会通知我们, 我们可以通过拿到回调函数然后就运行, 了解完毕后, 我们可以来编写我们的第一个并发程序, 他实现了一个简单的I/O复用的小逻辑, 代码和注释如下:

import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
# 选择事件循环
selector: DefaultSelector = DefaultSelector()
# 用于判断是否有事件在运行
running_cnt: int = 0
def request(host: str) -> None:
"""模拟请求并打印响应体"""
# 告诉主函数, 自己的事件还在运行
global running_cnt
running_cnt += 1
# 初始化socket
url: str = f"http://{host}"
sock: socket.SocketType = socket.socket()
sock.setblocking(False)
try:
sock.connect((host, 80))
except BlockingIOError:
pass
response_bytes: bytes = b""
def read_response() -> None:
"""接收响应参数, 并判断请求是否结束"""
nonlocal response_bytes
chunk: bytes = sock.recv(4096)
print(f"recv {host} body success")
if chunk:
response_bytes += chunk
else:
# 没有数据代表请求结束了, 注销监听
selector.unregister(sock.fileno())
global running_cnt
running_cnt -= 1
def connected() -> None:
"""socket建立连接时的回调"""
# 取消监听
selector.unregister(sock.fileno())
print(f"{host} connect success")
# 发送请求, 并监听读事件, 以及注册对应的接收响应函数
sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
selector.register(sock.fileno(), EVENT_READ, read_response)
selector.register(sock.fileno(), EVENT_WRITE, connected)
if __name__ == "__main__":
# 同时多个请求
request("so1n.me")
request("github.com")
request("google.com")
request("baidu.com")
# 监听是否有事件在运行
while running_cnt > 0:
# 等待事件循环通知事件是否已经完成
for key, mask in selector.select():
key.data()

这段代码接近同时注册了4个请求并注册建立连接回调, 然后就进入事件循环逻辑, 也就是把控制权交给事件循环, 直到事件循环告诉程序说收到了socket建立的通知, 程序就会取消注册的回调然后发送请求, 并注册一个读的事件回调, 然后又把控制权交给事件循环, 直到收到了响应的结果才进入处理响应结果函数并且只有收完所有响应结果才会退出程序。下面是我其中的一次执行结果:

so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success

可以看到他们的执行顺序是随机的, 不是严格的按照so1n.me, github.com, google.com, baidu.com顺序执行, 同时他们执行速度很快, 这个程序的耗时约等于响应时长最长的函数耗时。但是可以看出, 这个程序里面出现了两个回调, 回调会让代码变得非常的奇怪, 降低可读性, 也容易造成回调地狱, 而且当回调发生报错的时候, 我们是很难知道这是由于什么导致的错误, 因为它的上下文丢失了, 这样子排查问题十分的困惑。作为程序员, 一般都不止满足于速度快的代码, 真正想要的是又快, 又能像Sync的代码一样简单, 可读性强, 也能容易排查问题的代码, 这种组合形式的代码的设计模式就叫协程。

协程出现得很早, 它不像线程一样, 被系统调度, 而是能自主的暂停, 并等待事件循环通知恢复。由于协程是软件层面实现的, 所以它的实现方式有很多种, 这里要说的是基于生成器的协程, 因为生成器跟协程一样, 都有暂停让步和恢复的方法(还可以通过throw来抛错), 同时它跟Async语法的协程很像, 通过了解基于生成器的协程, 可以了解Async的协程是如何实现的。

三.基于生成器的协程

3.1生成器

在了解基于生成器的协程之前, 需要先了解下生成器, Python的生成器函数与普通的函数会有一些不同, 只有普通函数中带有关键字yield, 那么它就是生成器函数, 具体有什么不同可以通过他们的字节码来了解:

In [1]: import dis
# 普通函数
In [2]: def aaa(): pass
In [3]: dis.dis(aaa)
1 0 LOAD_CONST 0 (None)
2 RETURN_VALUE
# 普通函数调用函数
In [4]: def bbb():
...: aaa()
...:
In [5]: dis.dis(bbb)
2 0 LOAD_GLOBAL 0 (aaa)
2 CALL_FUNCTION 0
4 POP_TOP
6 LOAD_CONST 0 (None)
8 RETURN_VALUE
# 普通生成器函数
In [6]: def ccc(): yield
In [7]: dis.dis(ccc)
1 0 LOAD_CONST 0 (None)
2 YIELD_VALUE
4 POP_TOP
6 LOAD_CONST 0 (None)
8 RETURN_VALUE

上面分别是普通函数, 普通函数调用函数和普通生成器函数的字节码, 从字节码可以看出来, 最简单的函数只需要LOAD_CONST来加载变量None压入自己的栈, 然后通过RETURN_VALUE来返回值, 而有函数调用的普通函数则先加载变量, 把全局变量的函数aaa加载到自己的栈里面, 然后通过CALL_FUNCTION来调用函数, 最后通过POP_TOP把函数的返回值从栈里抛出来, 再把通过LOAD_CONST把None压入自己的栈, 最后返回值。而生成器函数则不一样, 它会先通过LOAD_CONST来加载变量None压入自己的栈, 然后通过YIELD_VALUE返回值, 接着通过POP_TOP弹出刚才的栈并重新把变量None压入自己的栈, 最后通过RETURN_VALUE来返回值。从字节码来分析可以很清楚的看到, 生成器能够在yield区分两个栈帧, 一个函数调用可以分为多次返回, 很符合协程多次等待的特点。

接着来看看生成器的一个使用, 这个生成器会有两次yield调用, 并在最后返回字符串'None', 代码如下:

In [8]: def demo():  
...: a = 1
...: b = 2
...: print('aaa', locals())
...: yield 1
...: print('bbb', locals())
...: yield 2
...: return 'None'
...:
In [9]: demo_gen = demo()
In [10]: demo_gen.send(None)
aaa {'a': 1, 'b': 2}
Out[10]: 1
In [11]: demo_gen.send(None)
bbb {'a': 1, 'b': 2}
Out[11]: 2
In [12]: demo_gen.send(None)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
in
----> 1 demo_gen.send(None)
StopIteration: None

这段代码首先通过函数调用生成一个demo_gen的生成器对象, 然后第一次send调用时返回值1, 第二次send调用时返回值2, 第三次send调用则抛出StopIteration异常, 异常提示为None, 同时可以看到第一次打印aaa和第二次打印bbb时, 他们都能打印到当前的函数局部变量, 可以发现在即使在不同的栈帧中, 他们读取到当前的局部函数内的局部变量是一致的, 这意味着如果使用生成器来模拟协程时, 它还是会一直读取到当前上下文的, 非常的完美。

此外, Python还支持通过yield from语法来返回一个生成器, 代码如下:

In [1]: def demo_gen_1():  
...: for i in range(3):
...: yield i
...:
In [2]: def demo_gen_2():
...: yield from demo_gen_1()
...:
In [3]: demo_gen_obj = demo_gen_2()
In [4]: demo_gen_obj.send(None)
Out[4]: 0
In [5]: demo_gen_obj.send(None)
Out[5]: 1
In [6]: demo_gen_obj.send(None)
Out[6]: 2
In [7]: demo_gen_obj.send(None)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
in
----> 1 demo_gen_obj.send(None)
StopIteration:

通过yield from就可以很方便的支持生成器调用, 假如把每个生成器函数都当做一个协程, 那通过yield from就可以很方便的实现协程间的调用, 此外生成器的抛出异常后的提醒非常人性化, 也支持throw来抛出异常, 这样我们就可以实现在协程运行时设置异常, 比如Cancel,演示代码如下:

In [1]: def demo_exc():  
...: yield 1
...: raise RuntimeError()
...:
In [2]: def demo_exc_1():
...: for i in range(3):
...: yield i
...:
In [3]: demo_exc_gen = demo_exc()
In [4]: demo_exc_gen.send(None)
Out[4]: 1
In [5]: demo_exc_gen.send(None)
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
in
----> 1 demo_exc_gen.send(None)
in demo_exc()
1 def demo_exc():
2 yield 1
----> 3 raise RuntimeError()
4
RuntimeError:
In [6]: demo_exc_gen_1 = demo_exc_1()
In [Python中的协程,到底是怎么回事?
链接分享:http://www.gawzjz.com/qtweb/news39/185139.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联