-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
125 lines (112 loc) · 4.24 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# server.py
import logging
import os
# import uuid
from io import BytesIO
import litserve as ls
import torch
# import tracemalloc
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor
# tracemalloc.start()
PORT = int(os.environ.get("PORT", "8000"))
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
NUM_API_SERVERS = int(os.environ.get("NUM_API_SERVERS", "1"))
MAX_BATCH_SIZE = int(os.environ.get("MAX_BATCH_SIZE", "2"))
DEFAULT_PROMPT = os.environ.get("DEFAULT_PROMPT", "<grounding> Describe this image:")
# Set up logging
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
class Kosmos2API(ls.LitAPI):
def setup(self, device):
# torch.backends.cuda.matmul.allow_tf32 = True # Allow TF32 on Ampere
# torch.backends.cudnn.benchmark = True # Enable cudnn autotuner
model = AutoModelForVision2Seq.from_pretrained(
"microsoft/kosmos-2-patch14-224", torch_dtype=torch.float16
)
self.processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
self.model = model.to(device)
self.model.eval() # Ensure eval mode
self.prompt = DEFAULT_PROMPT
self.device = device
def decode_request(self, request):
file_obj = request["content"]
prompt = request.get("prompt")
if prompt is None:
logger.debug("Using default prompt")
prompt = self.prompt
if "<grounding>" not in prompt:
prompt = f"<grounding> {prompt}"
try:
logger.info("Processing file")
file_bytes = file_obj.file.read()
# filename = request["content"].filename # Extract filename from the request
return prompt, Image.open(BytesIO(file_bytes))
except AttributeError:
if "http" in file_obj:
logger.info("Processing URL")
return None, file_obj
finally:
if not isinstance(file_obj, str):
file_obj.file.close() # Explicitly close the file object
def batch(self, inputs):
# comes in as a list of tuples
# logger.debug(f"Type of inputs: {type(inputs)}")
# comes out as a tuple of lists
prompts = [i[0] for i in inputs]
images = [i[1] for i in inputs]
return prompts, images
def predict(self, prompts_and_images):
prompt, images = prompts_and_images
# if isinstance(images, list):
# prompt = [self.prompt] * len(images)
# else:
# prompt = self.prompt
inputs = self.processor(text=prompt, images=images, return_tensors="pt")
inputs = {k: v.to(self.device) for k, v in inputs.items()}
generated_ids = self.model.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
use_cache=True,
max_new_tokens=256,
)
generated_text = self.processor.batch_decode(
generated_ids.cpu(), skip_special_tokens=True
)
data = []
for i in range(len(generated_text)):
text = generated_text[i]
_processed_text = self.processor.post_process_generation(
text, cleanup_and_extract=False
)
processed_text, entities = self.processor.post_process_generation(text)
data.append(
{
# "filename": original_filename,
"generated_text": _processed_text,
"entities": entities,
"output": processed_text,
}
)
return data
if __name__ == "__main__":
server = ls.LitServer(
Kosmos2API(),
accelerator="auto",
max_batch_size=MAX_BATCH_SIZE, # needs to be > 1 to hit self.batch
track_requests=True,
api_path="/predict",
workers_per_device=NUM_API_SERVERS,
)
server.run(
port=PORT,
host="0.0.0.0",
# num_api_servers=1,
log_level=LOG_LEVEL.lower(),
)