-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlabel.py
74 lines (62 loc) · 2.38 KB
/
label.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import datetime
import grpc
import pytz
import os
import sys
from functools import partial
from uuid import uuid4
from base64 import b64decode
from protos import infer_pb2, infer_pb2_grpc
from protos.infer_pb2 import MetaData
def local_time(now: datetime.datetime) -> str:
"""convert datetime to string in ISO 8601 format"""
return pytz.timezone('Asia/Shanghai').localize(now).isoformat()
def add_meta_data(pb: 'google.protobuf') -> None:
"""add `google.protobuf.pyext.cpp_message.GeneratedProtocolMessageType` meta data"""
now = datetime.datetime.now()
pb._meta_data.timestamp = now.timestamp()
pb._meta_data.environment = MetaData.Environment.Value(
os.getenv('RUN_ENV', 'PRODUCTION'))
pb._meta_data.uuid = str(uuid4())
pb._meta_data.iso_date = local_time(now)
def image_preprocess(pic: dict) -> infer_pb2.Image:
key = pic.get("img_id")
if not key:
raise KeyError('image id not exists.')
detect_img_b64 = pic["img_b64"]
# remove image base64 header such as data:image/jpeg;base64
detect_img_raw = b64decode(detect_img_b64.split(',', 1)[-1])
im = infer_pb2.Image(raw_data=detect_img_raw, image_id=key)
add_meta_data(im)
return im
class ServiceClient:
"""
gRPC client wrapper, capture errror and can init call timeout
"""
def __init__(self, module: infer_pb2_grpc, stub: str,
host: str, port: int, timeout: int = 5) -> None:
"""
:param module: module Generated by the gRPC Python protocol compiler
:param stub: stub name
"""
channel = grpc.insecure_channel(f'{host}:{port}')
try:
grpc.channel_ready_future(channel).result(timeout=10)
except grpc.FutureTimeoutError:
sys.exit(f'Error connecting to {host}:{port} gRPC server, exit.')
self.stub = getattr(module, stub)(channel)
self.timeout = timeout
def __getattr__(self, attr):
return partial(self._wrapped_call, self.stub, attr)
# args[0]: stub, args[1]: function to call, args[3]: Request
# kwargs: keyword arguments
def _wrapped_call(self, *args, **kwargs):
try:
return getattr(args[0], args[1])(
args[2], **kwargs, timeout=self.timeout
)
except grpc.RpcError as e:
print('Call {0} failed with {1}'.format(
args[1], e.code())
)
raise