抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

ControlNet

个人博客 << 分享一些有用的东西

在一些场合下,我们需要同时运行多个Python程序,并且希望这些Python进程之间能互相通讯,发送一些值或者接收一些值。本文我们就来测试一下Python的跨进程通信不同方案的效率。

背景介绍

请考虑以下场景,要处理一个数据,我们需要有3步比较耗时的操作,而这个每一步的操作需要上一步的结果,如下图所示。

flowchart LR
    Input --> Step1 --> Step2 --> Step3 --> Output

在这里,有两种使用多进程并行的思路,使用多个进程,每个进程接受一个数据,处理完全部三步之后返回结果,每个进程之间相互独立,如下图所示。

flowchart LR
    subgraph Process3
        direction LR
        Input3 --> p3s1[Step1] --> p3s2[Step2] --> p3s3[Step3] --> Output3
    end
    subgraph Process2
        direction LR
        Input2 --> p2s1[Step1] --> p2s2[Step2] --> p2s3[Step3] --> Output2
    end
    subgraph Process1
        direction LR
        Input1 --> p1s1[Step1] --> p1s2[Step2] --> p1s3[Step3] --> Output1
    end

这种方法操作简单,不需要进程间通信,也容易扩展到更多的进程数,在绝大多数情况下都推荐使用。然而这种模式需要将3个Step的上下文都载入内存中,如果这些Step是占用内存很高的深度学习模型,那么内存将会成为一个严重瓶颈。

为了解决这个问题,我们可以使用另一种模式将其并行。

flowchart LR
Input --> Step1 .-> Step2 .-> Step3 .-> Output
subgraph Process1
    Step1
end
subgraph Process2
    Step2
end
subgraph Process3
    Step3
end

其中这里的虚线表示进程间通信(IPC)。相比于第一种并行方式,这种并行方式操作复杂,需要进程间通信,但是可以有效的减少内存占用。

然而,这种并行相比于第一种方案,需要消耗额外的时间在IPC上,因此我们需要测试一下不同IPC方案的效率。

如果以上图表没有正确渲染,请刷新页面。

运行环境

以下实验都在以下环境运行:

1
2
3
4
5
6
7
CPU: i7-10900X
RAM: 128GB

Python: 3.10.8
websockets: 10.4
fastapi: 0.88.0
numpy: 1.23.4

需要通讯的内容为4种不同尺寸的numpy.ndarray[1],数据类型为float64。分别为:

  • (1, 3, 224, 224), 模拟一张图片
  • (2, 1024, 16, 16), 模拟两张图像的低维特征
  • (2, 3, 16, 224, 224), 模拟两个16帧的视频片段
  • (128, 1024, 1024), 模拟128个序列特征

方案介绍

方案1: HTTP + JSON 序列化

这是最简单的方案,使用HTTP协议作为通信协议,将ndarray转换成Python的嵌套List,然后作为json发送。这种方案的优点是实现简单,不需要额外的依赖,缺点是从ndarrayList互相转换的开销大,而且json序列化的开销也很大。

这里HTTP通过fastapi[2]实现,fastapi是一个高性能的异步框架,可以很好的支持大量的并发请求。

方案2: HTTP + Base64 Bytes

这种方案和方案1类似,不过将ndarray通过numpy内置的方法转换成bytes,然后使用base64编码,这样可以避免ndarrayList之间的转换,但是HTTP传输大规模的base64编码的开销也很大。

方案3: Websocket + Bytes

这种方案和方案2类似,不过使用Websocket作为通信协议,这样可以避免HTTP的开销。因为Websocket可以发送ascii之外的字节,所以不需要base64编码。

方案4: HTTP + Shared Memory

这里采用了multiprocessing.shared_memory模块,使用SharedMemory对象将ndarray的地址共享给子进程。然后将SharedMemory对象名字作为HTTP的返回值,客户端再通过名字获取SharedMemory对象,这样可以避免ndarrayList之间的转换,也避免了base64编码的开销。

方案5: Websocket + Shared Memory

这种方案和方案4类似,不过使用Websocket作为通信协议,这样可以避免HTTP的开销。

方案6: Multiprocessing Listener / Client

这种方案使用multiprocessing模块的ListenerClient对象,使用multiprocessingPipe作为通信协议,这样可以避免HTTP的开销。

测试结果

ndarray Shape (1, 3, 224, 224) (2, 1024, 16, 16) (2, 3, 16, 224, 224) (128, 1024, 1024)
HTTP + JSON 290.00 ms 1090 ms 9230 ms 259.00 s
HTTP + Base64 Bytes 26.40 ms 51.5 ms 398 ms 12.30 s
Websocket +
Bytes
4.27 ms 15.0 ms 171 ms 5.14 s
HTTP +
Shared Memory
10.70 ms 18.1 ms 127 ms 3.13 s
Websocket +
Shared Memory
4.34 ms 14.9 ms 127 ms 3.82 s
Multiprocessing
Listener
7.00 ms 17.2 ms 162 ms 4.73 s

根据这个结果我们可以发现,方案4和方案5的性能是最好的,方案6的性能也很好,方案1和方案2的性能最差。

考虑网络传输协议,Websocket的性能是比HTTP好的。所以应该尽量使用Websocket作为网络传输协议。

考虑使用Base64还是Shared Memory,我们可以发现大数据的情况下,Shared Memory的性能是比较好的,但是它需要手动管理内存,可能会有一些问题。所以对于小数据,可以使用Base64,对于大数据,可以使用Shared Memory。

对于multiprocessing模块的ListenerClient,它的性能略弱于Shared Memory,但是它不需要手动管理共享内存,而且它不需要用fastapi之类的外部库,而且不需要转换成别的类型的数据,比较方便。但是正因为它没有使用fastapi,以至于它不是很方便的进行异步处理。

在写以下比较重复的代码实现的时候,Github Copilot[3]起到了很大的帮助。

代码实现

方案1: HTTP + JSON 序列化

1
2
3
4
5
6
7
8
9
10
# server.py
import numpy as np
from fastapi import FastAPI

app = FastAPI()
shape = (1, 3, 224, 224)

@app.get("/random_tolist")
async def random_tolist():
return np.random.randn(*shape).tolist()
1
2
3
4
5
6
7
# client.py
import requests
import numpy as np

result = requests.get("http://127.0.0.1:1234/random_tolist")
result = np.array(result.json())
print(result.shape)

方案2: HTTP + Base64 Bytes

1
2
3
4
5
6
7
8
9
10
11
12
# server.py
import numpy as np
import base64
from fastapi import FastAPI
from fastapi.responses import PlainTextResponse

app = FastAPI()
shape = (1, 3, 224, 224)

@app.get("/random_tobytes", response_class=PlainTextResponse)
async def random_tobytes():
return base64.b64encode(np.random.randn(*shape).tobytes())
1
2
3
4
5
6
7
8
# client.py
import requests
import numpy as np
import base64

result = requests.get("http://127.0.0.1:1234/random_tobytes")
result = np.frombuffer(base64.b64decode(result.text)).reshape(*shape)
print(result.shape)

方案3: Websocket + Bytes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# server.py
import numpy as np
from fastapi import FastAPI, WebSocket

app = FastAPI()
shape = (1, 3, 224, 224)

@app.websocket("/ws/random_tobytes")
async def websocket_random_tobytes(websocket: WebSocket):
await websocket.accept()
while True:
await websocket.receive_text()
print("Processing websocket")
await websocket.send_text(np.random.randn(*shape).tobytes())

1
2
3
4
5
6
7
8
9
# client.py
import numpy as np
from websocket import create_connection
ws = create_connection("ws://127.0.0.1:1234/ws/random_tobytes")

ws.send("")
result = np.frombuffer(ws.recv()).reshape(*shape)
print(result.shape)

方案4: HTTP + Shared Memory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# server.py
import numpy as np
from fastapi import FastAPI
from fastapi.responses import PlainTextResponse
from multiprocessing import shared_memory

app = FastAPI()
shape = (1, 3, 224, 224)

@app.get("/random_sharedmemory", response_class=PlainTextResponse)
async def random_sharedmemory():
arr = np.random.randn(*shape)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
out = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
out[:] = arr[:]
shm.close()
return shm.name
1
2
3
4
5
6
7
8
9
10
11
12
13
# client.py
import numpy as np
import requests
from multiprocessing import shared_memory

shm_name = requests.get("http://127.0.0.1:1234/random_sharedmemory")
shm = shared_memory.SharedMemory(name=shm_name.text)
result = np.ndarray(shape, dtype=float, buffer=shm.buf)

shm.close()
shm.unlink()

print(result.shape)

方案5: Websocket + Shared Memory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# server.py
import numpy as np
from fastapi import FastAPI, WebSocket
from multiprocessing import shared_memory

app = FastAPI()
shape = (1, 3, 224, 224)

@app.websocket("/ws/random_sharedmemory")
async def websocket_random_sharedmemory(websocket: WebSocket):
await websocket.accept()
while True:
await websocket.receive_text()
print("Processing websocket")
arr = np.random.randn(*shape)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
out = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
out[:] = arr[:]
shm.close()
print(shm.name)
await websocket.send_text(shm.name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# client.py
import numpy as np
from websocket import create_connection
from multiprocessing import shared_memory

ws = create_connection("ws://127.0.0.1:1234/ws/random_sharedmemory")
ws.send("")
shm_name = ws.recv()
shm = shared_memory.SharedMemory(name=shm_name)
result = np.ndarray(shape, dtype=float, buffer=shm.buf)

shm.close()
shm.unlink()

print(result.shape)

方案6: Multiprocessing Listener / Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# server.py
from multiprocessing.connection import Listener
import numpy as np

shape = (1, 3, 224, 224)

address = ('localhost', 1234)
listener = Listener(address)
conn = listener.accept()

while True:
msg = conn.recv()
print("Processing")
conn.send(np.random.randn(*shape))

listener.close()
1
2
3
4
5
6
# client.py
from multiprocessing.connection import Client
conn = Client(("localhost", 1234))
conn.send("")
result = conn.recv()
print(result.shape)

参考文献

  • [1] "NumPy", Numpy.org, 2022. [Online]. Available: https://numpy.org/.
  • [2] "FastAPI", FastAPI, 2022. [Online]. Available: https://fastapi.tiangolo.com/
  • [3] "GitHub Copilot · Your AI pair programmer," GitHub, 2022. https://github.com/features/copilot

评论