【Grpc】使用grpc配置通过端口访问的python服务

0x00 前言

近期兴趣使然的技术调研越发的少了(TTS算一个),主要的都是为了项目和任务去研究的东西。目前的情况是为了节约显存,对一个较大的模型而言,比起使用4个worker来重复的占用显存,不如只占用一份显存,但是开启服务流式或触发式地处理不同项目的需求。
于是 @caoyixuan93 学长向我推荐了GRPC,经过 @hongfeng 和 @phchang 的帮助,终于得以成功实现了一个小的自定义demo,后续再花些时间把模型装载上去。

0x01 GRPC介绍

GRPC: A high performance, open source, general-purpose RPC framework

简单的来说,就是一个开源的“服务端-客户端”框架,你可以把你的服务(例如模型的预测函数)挂载起来,随时接受到通过端口发送来的输入数据,计算后将输出返回回去。当有多个访问时,以队列或者流的形式逐个处理。

0x02 环境配置

我们来参照 Python Quick Start ,对于一个简单的grpc服务,逐步搭建起来依次需要哪些东西,从而来对应的看看需要配置的环境吧:

  • Python >= 3.4 grpc有很多版本,这里我选择使用便捷的python
  • grpcio>=1.28.1 既然要用grpc那自然要装上grpc了,这里的grpcio是指grpc的python版本,此外还有其他各种语言的版本就没有多做研究啦,直接 pip install grpcio 即可
  • grpcio-tools 这个工具的作用是通过读取.proto格式的配置文件,产生两段python代码供直接调用,直接 pip install grpcio-tools 即可
  • protobuf==3.6.0 上一步的 pip install 会附赠安装一个3.11.x的,结果版本太高了反而容易出错,这里我们选用稳定的3.6.0版本了,用途是读取上面提到的.proto格式的文件,进行服务的变量名配置

0x03 源码及分析

个人可能脑袋瓜比较笨,看 Python Quick Start 里的例子是硬生生没明白,所以比起他的helloworld,不如自己自定义一个demo来尝试跑通,这用来学习一种新框架是一个不错的方法。

配置文件

https://github.com/okcd00/CDAlter/tree/master/CDMemory/protos

首先是令人劝退的 protobuf 语言,啊我没学过怎么办,啊看起来头好疼。
然后看着看着,哦,就这呀?还好还好,又不用写逻辑,就是配置嘛,那我会,yml我也不会呀但我照葫芦画瓢写配置还是能做到的嘛。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 意思是我们用了proto3的版本,不用管这行,大概python2会变成proto2吧,我瞎猜的
syntax = "proto3";

// 我们自定义的这个配置文件叫啥,会用在产生代码的时候当名字用
package keyvaluestore;

// service里就是预先定义一下你这个服务有哪些函数可以调用
// 这里只需要定义不需要实现任何功能,功能都是在python里实现的
service KeyValueStore {
// 函数格式: rpc 函数名 (输入变量) returns (输出变量) {}
// 变量都要在下面用 message 来定义了才能用哦
rpc ask (Key) returns (Response) {}
rpc remember (Item) returns (Response) {}
}

// 变量名:Key
// 有一个成员变量,为词典里的键
message Key {
string key = 1;
}

// 变量名:Item
// 有两个成员变量,为词典里的键和值
message Item {
string key = 1;
string value = 2;
// 这个1和2就当作是标识符就好,有几个变量就要写到几,这样服务器才能数的清楚不会搞错
}

// 变量名:Response
// 有一个成员变量,为词典里的值
message Response {
string value = 1;
}

这个proto文件设置好了之后,通过命令行(cmd,命令提示符,git bash都可以)来调用tool自动生成两个python文件,调用方法为,先进入proto所在的文件夹(例如 cd /i/Github/CDAlter/CDMemory/protos),然后执行如下命令(这里我的proto文件叫作keyvaluestore.proto):

1
2
3
4
5
python -m grpc_tools.protoc \
-I ./ \
--python_out=. \
--grpc_python_out=. \
keyvaluestore.proto

里面的 \ 是为了格式好看点,偷懒的话在提供个可以直接输入一整行的:

1
python -m grpc_tools.protoc -I ./ --python_out=. --grpc_python_out=. keyvaluestore.proto

因为我的配置文件叫keyvaluestore.proto,所以在当前目录下生成了keyvaluestore_pb2.pykeyvaluestore_pb2_grpc.py两个文件。

服务端与客户端

https://github.com/okcd00/CDAlter/tree/master/CDNerve

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: gbk -*-
# ==========================================================================
# Copyright (C) since 2020 All rights reserved.
#
# filename : grpc_server.py
# author : chendian / okcd00@qq.com
# date : 2020-04-16
# desc : server in grpc service
# ==========================================================================
import sys
import time
import grpc
from concurrent import futures
from multiprocessing import Pool
from collections import OrderedDict
from grpc._cython.cygrpc import CompressionAlgorithm, CompressionLevel

"""
# generate it at first
python -m grpc_tools.protoc \
-I ./ \
--python_out=. \
--grpc_python_out=. \
keyvaluestore.proto
# then you can get keyvaluestore_pb2_grpc and keyvaluestore_pb2
"""
# 这里我的protos不在当前目录下,所以加了个pythonpath
sys.path.append('../CDMemory/protos/')
from CDMemory.protos import keyvaluestore_pb2_grpc, keyvaluestore_pb2


class KVServicer(keyvaluestore_pb2_grpc.KeyValueStoreServicer):
def __init__(self):
# 这里可以把模型什么的都放进来,比如 self.model = AlbertModel()
self.records = OrderedDict() # 这里用一个字典来作为Demo服务的载体

# 之前在 service 的部分预先定义的函数都要在这里重载,而且变量名的个数要一致,
# 不然会报错:"Exception iterating requests!"
# 我这里统一用了重载的函数名,也可以写成 `def ask(self, Key, context):`
def ask(self, request, context):
# 对应 rpc ask (Key) returns (Response) {}
# 这个 request 就是 `ask` 后面那个 `Key`
_value = self.records.get(request.key, "Empty")
# 这个`keyvaluestore_pb2.Response`就是 `returns` 后面的那个 `Response`
return keyvaluestore_pb2.Response(value=_value)

# 我这里统一用了重载的函数名,也可以写成 `def remember(self, Item, context):`
def remember(self, request, context):
# 对应 rpc remember (Item) returns (Response) {}
# 这个 request 就是 `remember ` 后面那个 `Item`
key, value = request.key, request.value # 就是我们在Item里定义的key/value
self.records.update({key: value}) # 这个grpc的demo效果就是字典的写和查
return keyvaluestore_pb2.Response(
value="Remembered: the value for {} is {}".format(key, value))


def serve(n_worker=4, port=20416):
max_receive_message_length = 512
# 初始化 Servicer 实例
service = KVServicer()
# 多线程池,设定线程数量
service.process_pool = Pool(processes=n_worker)
# 建立 server 实例
server = grpc.server(futures.ThreadPoolExecutor(max_workers=n_worker),
options=[ # 这里的options可以省去不要,我这里加的设置是限制最大长度和压缩
('grpc.max_receive_message_length', max_receive_message_length),
('grpc.default_compression_algorithm', CompressionAlgorithm.gzip),
('grpc.grpc.default_compression_level', CompressionLevel.high)
])
# 用自动生成的 keyvaluestore_pb2_grpc 来添加 Servicer 到服务中
keyvaluestore_pb2_grpc.add_KeyValueStoreServicer_to_server(service, server)
# 设置端口号(内网外网均可访问)
server.add_insecure_port('[::]:{}'.format(port))
# 服务启动
server.start()
# server.wait_for_termination()
return server


if __name__ == "__main__":
print("starting server...")
server_agent = serve()
print("server started.")
while True:
time.sleep(10000)
print("[ALIVE] {}".format(time.ctime()))

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# -*- coding: gbk -*-
# ==========================================================================
# Copyright (C) since 2020 All rights reserved.
#
# filename : grpc_client.py
# author : chendian / okcd00@qq.com
# date : 2020-04-16
# desc : client in grpc service
# ==========================================================================
import os
import sys
if os.environ.get('https_proxy'):
del os.environ['https_proxy']
if os.environ.get('http_proxy'):
del os.environ['http_proxy']
import grpc

"""
# generate it at first
python -m grpc_tools.protoc \
-I ./ \
--python_out=. \
--grpc_python_out=. \
keyvaluestore.proto
# then you can get keyvaluestore_pb2_grpc and keyvaluestore_pb2
"""
# 这里我的protos不在当前目录下,所以加了个pythonpath
sys.path.append('../CDMemory/protos/')
from CDMemory.protos import keyvaluestore_pb2_grpc, keyvaluestore_pb2


if __name__ == '__main__':
# 连接上对应端口的grpc服务
with grpc.insecure_channel('localhost:20416') as channel:
# 初始化一个实例作为客户端
stub = keyvaluestore_pb2_grpc.KeyValueStoreStub(channel)
# 调用我们定义的remember函数,效果是写入 key=name, value=cd
response = stub.remember(keyvaluestore_pb2.Item(key='name', value='cd'))
print(response) # value: "Remembered: the value for name is cd"
# 调用我们定义的ask函数,效果是查询 key=name 对应的 value
response = stub.ask(keyvaluestore_pb2.Key(key='name'))
print(response) # value: "cd"