diff --git a/bloom-inference-server/README.md b/bloom-inference-server/README.md new file mode 100644 index 0000000..0c412a9 --- /dev/null +++ b/bloom-inference-server/README.md @@ -0,0 +1,82 @@ +## Inference solutions for BLOOM 176B +We support HuggingFace accelerate and DeepSpeed Inference for generation. + +Install required packages: + +```shell +pip install fastapi uvicorn accelerate huggingface_hub>=0.9.0 deepspeed>=0.7.3 +``` +To install [DeepSpeed-MII](https://github.com/microsoft/DeepSpeed-MII): +```shell +git clone https://github.com/microsoft/DeepSpeed-MII +cd DeepSpeed-MII +pip install . +``` + +All the provided scripts are tested on 8 A100 80GB GPUs for BLOOM 176B (fp16/bf16) and 4 A100 80GB GPUs for BLOOM 176B (int8). These scripts might not work for other models or a different number of GPUs. + +DS inference is deployed using the DeepSpeed MII library which requires the resharded checkpoints for 8 x Tensor Parallel. + +Note: sometimes GPU memory is not freed when DS inference deployment is shutdown. You can free this memory by running: +```python +import mii +mii.terminate("ds_inference_grpc_server") +``` +or alternatively, just doing a `killall python` in terminal. + +For using BLOOM quantized, use dtype = int8. Also, change the model_name to microsoft/bloom-deepspeed-inference-int8 for DeepSpeed-Inference. For HF accelerate, no change is needed for model_name. + +HF accelerate uses [LLM.int8()](https://arxiv.org/abs/2208.07339) and DS-inference uses [ZeroQuant](https://arxiv.org/abs/2206.01861) for post-training quantization. + +#### BLOOM inference via command-line +This asks for generate_kwargs everytime. +Example: generate_kwargs = +```json +{"min_length": 100, "max_new_tokens": 100, "do_sample": false} +``` + +1. using HF accelerate +```shell +python scripts/bloom-inference-server/cli.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --generate_kwargs '{"min_length": 100, "max_new_tokens": 100, "do_sample": false}' +``` + +2. using DS inference +```shell +python scripts/bloom-inference-server/cli.py --model_name microsoft/bloom-deepspeed-inference-fp16 --dtype fp16 --deployment_framework ds_inference --generate_kwargs '{"min_length": 100, "max_new_tokens": 100, "do_sample": false}' +``` + +#### BLOOM server deployment +1. using HF accelerate +```shell +python scripts/bloom-inference-server/server.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --host --port --allowed_max_new_tokens 100 +``` + +2. using DS inference +```shell +python scripts/bloom-inference-server/server.py --model_name microsoft/bloom-deepspeed-inference-fp16 --dtype fp16 --deployment_framework ds_inference --host --port --allowed_max_new_tokens 100 +``` + +We provide an example [script](examples/server_request.py) to query the BLOOM server is provided. To run this script: +```shell +python scripts/bloom-inference-server/examples/server_request.py --host --port +``` + +#### Benchmark system for BLOOM inference +1. using HF accelerate +```shell +python scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --benchmark_cycles 5 +``` + +2. using DS inference +```shell +deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype fp16 --deployment_framework ds_inference --benchmark_cycles 5 +``` +alternatively, to load model faster: +```shell +deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name microsoft/bloom-deepspeed-inference-fp16 --dtype fp16 --deployment_framework ds_inference --benchmark_cycles 5 +``` + +3. using DS ZeRO +```shell +deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework ds_zero --benchmark_cycles 5 +``` diff --git a/bloom-inference-server/benchmark.py b/bloom-inference-server/benchmark.py new file mode 100644 index 0000000..455af9e --- /dev/null +++ b/bloom-inference-server/benchmark.py @@ -0,0 +1,160 @@ +import argparse +import gc +import os +from functools import partial + +import deepspeed +import torch + +import utils +from ds_inference import DSInferenceModel +from ds_zero import DSZeROModel +from hf_accelerate import HFAccelerateModel +from utils import ( + BENCHMARK, + DS_INFERENCE, + DS_ZERO, + HF_ACCELERATE, + GenerateRequest, + Model, + get_argument_parser, + get_dummy_batch, + parse_generate_kwargs, + print_rank_n, + run_and_log_time +) + + +def benchmark_generation(model: Model, + request: GenerateRequest, + cycles: int = 5): + total_new_tokens_generated = 0 + for _ in range(cycles): + response = model.generate(request) + total_new_tokens_generated += sum( + new_tokens for new_tokens in response.num_generated_tokens) + return total_new_tokens_generated + + +def get_benchmark_results(benchmark_time: float, + initialization_time: float, + total_new_tokens_generated: int, + batch_size: int, + cycles: int) -> str: + throughput = total_new_tokens_generated / benchmark_time + latency = benchmark_time / cycles + return f""" +*** Performance stats: +Throughput (including tokenization) = {throughput:.2f} tokens/sec +Throughput (including tokenization) = {1000 / throughput:.2f} msecs/token +Model loading time = {initialization_time:.2f} secs +Total tokens generated = {total_new_tokens_generated} with batch size = {batch_size} +Latency = {latency:.2f} secs +Model loading time + generation time per batch = {initialization_time + latency:.2f} secs +""" + + +def benchmark_end_to_end(args: argparse.Namespace, + model_class: Model, + zero_activated: bool = False) -> None: + model, initialization_time = run_and_log_time( + partial(model_class, args=args) + ) + + request = parse_generate_kwargs( + get_dummy_batch(args.batch_size), + args.generate_kwargs + ) + + request.preprocess() + + print_rank_n(f"generate_kwargs = {args.generate_kwargs}") + print_rank_n(f"batch_size = {args.batch_size}") + + # warmup is a must if measuring speed as it's when all the optimizations are performed + # e.g. on 8x80 a100 the first pass of 100 tokens takes 23sec, and the next one is 4secs + response = model.generate(request) + + for i, (o, _) in zip(request.text, zip(response.text, response.num_generated_tokens)): + print_rank_n(f"{'-' * 60}\nin = {i}\nout = {o}\n") + + if (args.benchmark_cycles > 0): + print_rank_n(f"*** Running benchmark") + + torch.cuda.empty_cache() + gc.collect() + + # warm up + model.generate(request) + torch.cuda.synchronize() + + # benchmark + total_new_tokens_generated, benchmark_time = run_and_log_time( + partial( + benchmark_generation, + model=model, + request=request, + cycles=args.benchmark_cycles + ) + ) + + # with ZeRO every GPU is generating batch_size * sequence_length tokens + if (zero_activated): + world_size = int(os.getenv('WORLD_SIZE', '1')) + total_new_tokens_generated *= world_size + + print_rank_n( + get_benchmark_results( + benchmark_time, + initialization_time, + total_new_tokens_generated, + args.batch_size, + args.benchmark_cycles + ) + ) + + +def get_args() -> argparse.Namespace: + parser = get_argument_parser() + + group = parser.add_argument_group(title="launch config") + group.add_argument("--benchmark_cycles", type=int, + default=0, help="additionally run benchmark") + group.add_argument("--local_rank", required=False, + type=int, help="used by dist launchers") + group.add_argument("--batch_size", default=1, type=int, help="batch size") + group.add_argument("--cpu_offload", action="store_true", + help="whether to activate CPU offload for DS ZeRO") + + args = utils.get_args(parser, BENCHMARK) + + launched_with_deepspeed = args.deployment_framework in [ + DS_INFERENCE, DS_ZERO] + + if (not launched_with_deepspeed): + assert args.local_rank == None, "local_rank must be None if not launched with DeepSpeed" + + if (args.cpu_offload): + assert args.deployment_framework == DS_ZERO, "cpu_offload only works with DS_ZeRO" + + return args + + +def main() -> None: + args = get_args() + + if (args.deployment_framework == HF_ACCELERATE): + benchmark_end_to_end(args, HFAccelerateModel) + elif (args.deployment_framework == DS_INFERENCE): + deepspeed.init_distributed("nccl") + benchmark_end_to_end(args, DSInferenceModel) + elif (args.deployment_framework == DS_ZERO): + deepspeed.init_distributed("nccl") + benchmark_end_to_end(args, DSZeROModel, zero_activated=True) + else: + raise ValueError( + f"Unknown deployment framework {args.deployment_framework}") + + +if (__name__ == "__main__"): + main() diff --git a/bloom-inference-server/cli.py b/bloom-inference-server/cli.py new file mode 100644 index 0000000..1b91b64 --- /dev/null +++ b/bloom-inference-server/cli.py @@ -0,0 +1,70 @@ +import argparse +import json +import sys + +import utils +from ds_inference import DSInferenceGRPCServer +from hf_accelerate import HFAccelerateModel +from utils import CLI, DS_INFERENCE, HF_ACCELERATE, get_argument_parser, parse_generate_kwargs, print_rank_n + + +def get_args() -> argparse.Namespace: + parser = get_argument_parser() + + group = parser.add_argument_group(title="launch config") + group.add_argument("--shutdown_command", required=False, + type=str, default="__shutdown__", help="This string will exit the script") + + args = utils.get_args(parser, CLI) + + return args + + +def main() -> None: + args = get_args() + + if (args.deployment_framework == HF_ACCELERATE): + model = HFAccelerateModel(args) + elif (args.deployment_framework == DS_INFERENCE): + model = DSInferenceGRPCServer(args) + else: + raise ValueError( + f"Unknown deployment framework {args.deployment_framework}") + + generate_kwargs = args.generate_kwargs + + while (True): + try: + input_text = input("Input text: ") + + if (input_text == args.shutdown_command): + model.shutdown() + + if (input("change generate_kwargs? [y/n] ") == "y"): + while (True): + try: + generate_kwargs = json.loads( + input("Generate kwargs: ")) + break + except KeyboardInterrupt: + model.shutdown() + except Exception as e: + e_type, e_message, _ = sys.exc_info() + print("error =", e_type.__name__) + print("message =", e_message) + continue + + request = parse_generate_kwargs([input_text], generate_kwargs) + + request.preprocess() + + response = model.generate(request) + + print_rank_n("Output text:", response.text[0]) + print_rank_n("Generated tokens:", response.num_generated_tokens[0]) + except KeyboardInterrupt: + model.shutdown() + + +if (__name__ == "__main__"): + main() diff --git a/bloom-inference-server/ds_inference/__init__.py b/bloom-inference-server/ds_inference/__init__.py new file mode 100644 index 0000000..47e0181 --- /dev/null +++ b/bloom-inference-server/ds_inference/__init__.py @@ -0,0 +1,2 @@ +from .grpc_server import DSInferenceGRPCServer +from .model import DSInferenceModel diff --git a/bloom-inference-server/ds_inference/grpc_server.py b/bloom-inference-server/ds_inference/grpc_server.py new file mode 100644 index 0000000..4379587 --- /dev/null +++ b/bloom-inference-server/ds_inference/grpc_server.py @@ -0,0 +1,87 @@ +import argparse +import json +import os + +import torch +from transformers import AutoTokenizer + +import mii +from utils import ( + GenerateRequest, + GenerateResponse, + Model, + get_downloaded_model_path, + get_filter_dict, + get_str_dtype, + print_rank_n +) + + +class DSInferenceGRPCServer(Model): + def __init__(self, args: argparse.Namespace) -> None: + self.deployment_name = "ds_inference_grpc_server" + + downloaded_model_path = get_downloaded_model_path(args.model_name) + + self.tokenizer = AutoTokenizer.from_pretrained(downloaded_model_path) + self.pad = self.tokenizer.pad_token_id + + if (args.dtype in [torch.float16, torch.int8]): + checkpoints_json = os.path.join( + downloaded_model_path, "ds_inference_config.json") + + mii.deploy( + task="text-generation", + # should pass args.model_name but can't since the new + # weights are not supported yet. So, this is a hack + model="bigscience/bloom", + deployment_name=self.deployment_name, + model_path=downloaded_model_path, + mii_config={ + "dtype": get_str_dtype(args.dtype), + "tensor_parallel": 8, + "port_number": 50950, + "checkpoint_dict": json.load(open(checkpoints_json, "r")) + } + ) + elif (args.dtype == torch.bfloat16): + raise NotImplementedError("bfloat16 is not yet supported") + + self.model = mii.mii_query_handle(self.deployment_name) + + def generate(self, request: GenerateRequest) -> GenerateResponse: + output_text = self.model.query( + {"query": request.text}, + **get_filter_dict(request) + ).response + + output_text = [_ for _ in output_text] + + # Remove input from output + input_tokens = self.tokenizer(request.text).input_ids + output_tokens = self.tokenizer(output_text).input_ids + + input_token_lengths = [len(x) for x in input_tokens] + output_token_lengths = [len(x) for x in output_tokens] + num_generated_tokens = [ + o - i for i, o in zip(input_token_lengths, output_token_lengths)] + + if (request.remove_input_from_output): + output_tokens = [x[-i:] + for x, i in zip(output_tokens, num_generated_tokens)] + output_text = self.tokenizer.batch_decode( + output_tokens, skip_special_tokens=True) + + return GenerateResponse( + text=output_text, + num_generated_tokens=num_generated_tokens + ) + + def shutdown(self) -> None: + print_rank_n("shutting down") + # MII is buggy and sometimes spits out an error in terminate + try: + mii.terminate(self.deployment_name) + except Exception: + pass + exit() diff --git a/bloom-inference-server/ds_inference/model.py b/bloom-inference-server/ds_inference/model.py new file mode 100644 index 0000000..0d8323f --- /dev/null +++ b/bloom-inference-server/ds_inference/model.py @@ -0,0 +1,92 @@ +import glob +import io +import json +import os +from argparse import Namespace +from functools import partial + +import deepspeed +import torch +import torch.distributed as dist +from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer + +from utils import Model, get_downloaded_model_path, print_rank_n, run_rank_n + + +class DSInferenceModel(Model): + def __init__(self, args: Namespace) -> None: + print_rank_n("Loading model...") + world_size = int(os.getenv("WORLD_SIZE", "1")) + + downloaded_model_path = get_downloaded_model_path(args.model_name) + + self.tokenizer = AutoTokenizer.from_pretrained(downloaded_model_path) + self.pad = self.tokenizer.pad_token_id + + # Load model + with deepspeed.OnDevice(dtype=torch.float16, device="meta"): + self.model = AutoModelForCausalLM.from_config( + AutoConfig.from_pretrained(downloaded_model_path), + torch_dtype=torch.bfloat16 + ) + self.model = self.model.eval() + + if (args.dtype in [torch.float16, torch.int8]): + if (args.use_pre_sharded_checkpoints): + checkpoints_json = os.path.join( + downloaded_model_path, "ds_inference_config.json") + + self.model = deepspeed.init_inference( + self.model, + mp_size=world_size, + base_dir=downloaded_model_path, + dtype=args.dtype, + checkpoint=checkpoints_json, + replace_with_kernel_inject=True + ) + else: + with TemporaryCheckpointsJSON(downloaded_model_path) as checkpoints_json: + self.model = deepspeed.init_inference( + self.model, + mp_size=world_size, + dtype=args.dtype, + checkpoint=checkpoints_json, + replace_with_kernel_inject=True + ) + elif (args.dtype == torch.bfloat16): + raise NotImplementedError("bfloat16 is not yet supported") + + self.model = self.model.module + self.input_device = torch.cuda.current_device() + + print_rank_n("Model loaded") + dist.barrier() + + +class TemporaryCheckpointsJSON: + def __init__(self, model_path: str): + self.tmp_directory = "tmp" + self.tmp_file = os.path.join(self.tmp_directory, "checkpoints.json") + self.model_path = model_path + + def write_checkpoints_json(self, model_path: str) -> None: + with io.open(self.tmp_file, "w", encoding="utf-8") as f: + data = { + "type": "BLOOM", + "checkpoints": glob.glob(f"{model_path}/*.bin"), + "version": 1.0 + } + json.dump(data, f) + + def __enter__(self): + run_rank_n( + partial(os.makedirs, name=self.tmp_directory, exist_ok=True) + ) + run_rank_n( + partial(self.write_checkpoints_json, model_path=self.model_path), + barrier=True + ) + return self.tmp_file + + def __exit__(self, type, value, traceback): + return diff --git a/bloom-inference-server/ds_zero/__init__.py b/bloom-inference-server/ds_zero/__init__.py new file mode 100644 index 0000000..846c561 --- /dev/null +++ b/bloom-inference-server/ds_zero/__init__.py @@ -0,0 +1 @@ +from .model import DSZeROModel diff --git a/bloom-inference-server/ds_zero/model.py b/bloom-inference-server/ds_zero/model.py new file mode 100644 index 0000000..6729f5f --- /dev/null +++ b/bloom-inference-server/ds_zero/model.py @@ -0,0 +1,68 @@ +import os +from argparse import Namespace + +import deepspeed +import torch +import torch.distributed as dist +from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer +from transformers.deepspeed import HfDeepSpeedConfig + +from utils import Model, get_downloaded_model_path, print_rank_n + + +class DSZeROModel(Model): + def __init__(self, args: Namespace) -> None: + print_rank_n("Loading model...") + + downloaded_model_path = get_downloaded_model_path(args.model_name) + + config = AutoConfig.from_pretrained(downloaded_model_path) + + world_size = int(os.getenv('WORLD_SIZE', '1')) + train_batch_size = 1 * world_size + + ds_config = { + "fp16": { + "enabled": args.dtype == torch.float16, + }, + "bf16": { + "enabled": args.dtype == torch.bfloat16, + }, + "zero_optimization": { + "stage": 3, + "overlap_comm": True, + "contiguous_gradients": True, + "reduce_bucket_size": config.hidden_size * config.hidden_size, + "stage3_prefetch_bucket_size": 0.9 * config.hidden_size * config.hidden_size, + "stage3_param_persistence_threshold": 0 + }, + "steps_per_print": 2000, + "train_batch_size": train_batch_size, + "train_micro_batch_size_per_gpu": 1, + "wall_clock_breakdown": False + } + + if (args.cpu_offload): + ds_config["zero_optimization"]["offload_param"] = { + "device": "cpu", + "pin_memory": True + } + + # this tells from_pretrained to instantiate directly on gpus + dschf = HfDeepSpeedConfig(ds_config) + + self.tokenizer = AutoTokenizer.from_pretrained(downloaded_model_path) + self.pad = self.tokenizer.pad_token_id + + self.model = AutoModelForCausalLM.from_pretrained( + downloaded_model_path, torch_dtype=args.dtype) + self.model = self.model.eval() + self.model = deepspeed.initialize( + model=self.model, config_params=ds_config)[0] + self.model.module.eval() + self.model = self.model.module + + self.input_device = torch.cuda.current_device() + + print_rank_n("Model loaded") + dist.barrier() diff --git a/bloom-inference-server/examples/server_request.py b/bloom-inference-server/examples/server_request.py new file mode 100644 index 0000000..d843ecd --- /dev/null +++ b/bloom-inference-server/examples/server_request.py @@ -0,0 +1,73 @@ +import argparse + +import requests + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + + group = parser.add_argument_group(title="launch config") + group.add_argument("--host", type=str, required=True, help="host address") + group.add_argument("--port", type=int, required=True, help="port number") + + return parser.parse_args() + + +def generate(url: str) -> None: + url = url + "/generate/" + + request_body = { + "text": [ + "DeepSpeed", + "DeepSpeed is a", + "DeepSpeed is a machine", + "DeepSpeed is a machine learning framework", + ], + "max_new_tokens": 40 + } + response = requests.post( + url=url, + json=request_body, + verify=False + ) + print(response.json(), "\n") + + +def tokenize(url: str) -> None: + url = url + "/tokenize/" + + request_body = { + "text": [ + "DeepSpeed is a", + "DeepSpeed is a machine learning framework" + ] + } + response = requests.post( + url=url, + json=request_body, + verify=False + ) + print(response.json(), "\n") + + +def query_id(url: str) -> None: + url = url + "/query_id/" + + response = requests.get( + url=url, + verify=False + ) + print(response.json(), "\n") + + +def main(): + args = get_args() + url = "http://{}:{}".format(args.host, args.port) + + generate(url) + tokenize(url) + query_id(url) + + +if (__name__ == "__main__"): + main() diff --git a/bloom-inference-server/hf_accelerate/__init__.py b/bloom-inference-server/hf_accelerate/__init__.py new file mode 100644 index 0000000..71a0b52 --- /dev/null +++ b/bloom-inference-server/hf_accelerate/__init__.py @@ -0,0 +1 @@ +from .model import HFAccelerateModel diff --git a/bloom-inference-server/hf_accelerate/model.py b/bloom-inference-server/hf_accelerate/model.py new file mode 100644 index 0000000..49f2c05 --- /dev/null +++ b/bloom-inference-server/hf_accelerate/model.py @@ -0,0 +1,94 @@ +from argparse import Namespace + +import torch +from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer + +from utils import Model, get_downloaded_model_path, print_rank_n + + +class HFAccelerateModel(Model): + def __init__(self, args: Namespace) -> None: + print_rank_n("Loading model...") + + downloaded_model_path = get_downloaded_model_path(args.model_name) + + self.tokenizer = AutoTokenizer.from_pretrained(downloaded_model_path) + self.pad = self.tokenizer.pad_token_id + + kwargs = { + "pretrained_model_name_or_path": downloaded_model_path, + "device_map": "auto", + "max_memory": get_max_memory_per_gpu_dict( + args.dtype, + args.model_name + ) + } + if (args.dtype == torch.int8): + kwargs["load_in_8bit"] = True + else: + kwargs["torch_dtype"] = args.dtype + + self.model = AutoModelForCausalLM.from_pretrained(**kwargs) + + self.model.requires_grad_(False) + self.model.eval() + self.input_device = "cuda:0" + + print_rank_n("Model loaded") + + +def get_max_memory_per_gpu_dict(dtype, model_name): + """ try to generate the memory map based on what we know about the model and the available hardware """ + + # figure out the memory map - the minimum per gpu required to load the model + n_gpus = torch.cuda.device_count() + + if model_name == "bigscience/bloom" and n_gpus == 8 and torch.cuda.get_device_properties(0).total_memory > 79*2**30: + # hand crafted optimized memory map for 8x80 setup over BLOOM + # this works with bs=40 + if (dtype in [torch.bfloat16, torch.float16]): + max_memory_per_gpu = {0: '0GIB', 1: '51GIB', 2: '51GIB', 3: '51GIB', + 4: '51GIB', 5: '51GIB', 6: '51GIB', 7: '51GIB'} + elif (dtype == torch.int8): + max_memory_per_gpu = {0: '0GIB', 1: '26GIB', 2: '26GIB', 3: '26GIB', + 4: '26GIB', 5: '26GIB', 6: '26GIB', 7: '26GIB'} + print_rank_n("Max memory per gpu:", max_memory_per_gpu) + return max_memory_per_gpu + try: + # model_params calculation, as we don't have a model yet to do: + #model_params = sum(dict((p.data_ptr(), p.numel()) for p in model.parameters()).values()) + + config = AutoConfig.from_pretrained(model_name) + h = config.hidden_size + l = config.n_layer + v = config.vocab_size + # from https://github.com/bigscience-workshop/bigscience/tree/6917a3b5fefcf439d3485ca184b4d9f6ab605150/math#model-sizing + model_params = l*(12*h**2 + 13*h) + v*h + 4*h + except: + print_rank_n( + f"The model {model_name} has a broken config file. Please notify the owner") + raise + + if (dtype == torch.int8): + bytes = 1 + else: + bytes = torch.finfo(dtype).bits / 8 + param_memory_total_in_bytes = model_params * bytes + # add 5% since weight sizes aren't the same and some GPU may need more memory + param_memory_per_gpu_in_bytes = int( + param_memory_total_in_bytes / n_gpus * 1.10) + print_rank_n( + f"Estimating {param_memory_per_gpu_in_bytes/2**30:0.2f}GB per gpu for weights") + + # check the real available memory + # load cuda kernels first and only measure the real free memory after loading (shorter by ~2GB) + torch.ones(1).cuda() + max_memory_per_gpu_in_bytes = torch.cuda.mem_get_info(0)[0] + if max_memory_per_gpu_in_bytes < param_memory_per_gpu_in_bytes: + raise ValueError( + f"Unable to generate the memory map automatically as the needed estimated memory per gpu ({param_memory_per_gpu_in_bytes/2**30:0.2f}GB) is bigger than the available per gpu memory ({max_memory_per_gpu_in_bytes/2**30:0.2f}GB)") + + max_memory_per_gpu = { + i: param_memory_per_gpu_in_bytes for i in range(torch.cuda.device_count())} + print("Max memory per gpu:", max_memory_per_gpu) + return max_memory_per_gpu diff --git a/bloom-inference-server/server.py b/bloom-inference-server/server.py new file mode 100644 index 0000000..fa91fda --- /dev/null +++ b/bloom-inference-server/server.py @@ -0,0 +1,176 @@ +import argparse +import sys +import traceback +from functools import partial + +import utils +from ds_inference import DSInferenceGRPCServer +from fastapi import FastAPI, HTTPException +from fastapi.routing import APIRoute +from hf_accelerate import HFAccelerateModel +from pydantic import BaseModel +from utils import ( + DS_INFERENCE, + HF_ACCELERATE, + SERVER, + GenerateRequest, + GenerateResponse, + TokenizeRequest, + TokenizeResponse, + get_argument_parser, + get_num_tokens_to_generate, + run_and_log_time +) +from uvicorn import run + + +class QueryID(BaseModel): + generate_query_id: int = 0 + tokenize_query_id: int = 0 + + +def get_args() -> argparse.Namespace: + parser = get_argument_parser() + + group = parser.add_argument_group(title="launch config") + group.add_argument("--host", type=str, required=True, help="host address") + group.add_argument("--port", type=int, required=True, help="port number") + group.add_argument("--workers", type=int, default=1, + help="number of http workers") + group.add_argument("--allowed_max_new_tokens", type=int, + default=100, help="max allowed tokens") + group.add_argument("--debug", action="store_true", + help="launch in debug mode") + + args = utils.get_args(parser, SERVER) + + return args + + +class Server: + def __init__(self, args: argparse.Namespace): + self.host = args.host + self.port = args.port + self.workers = args.workers + self.debug = args.debug + + self.allowed_max_new_tokens = args.allowed_max_new_tokens + self.query_ids = QueryID() + + if (args.deployment_framework == HF_ACCELERATE): + self.model = HFAccelerateModel(args) + elif (args.deployment_framework == DS_INFERENCE): + self.model = DSInferenceGRPCServer(args) + else: + raise ValueError( + f"Unknown deployment framework {args.deployment_framework}") + + self.app = FastAPI( + routes=[ + APIRoute( + "/generate/", + self.generate, + methods=["POST"], + ), + APIRoute( + "/tokenize/", + self.tokenize, + methods=["POST"], + ), + APIRoute( + "/query_id/", + self.query_id, + methods=["GET"], + ) + ], + timeout=600, + ) + + def get_exception_response(self, query_id: int, method: str): + e_type, e_message, e_stack_trace = sys.exc_info() + response = { + "error": str(e_type.__name__), + "message": str(e_message), + "query_id": query_id, + "method": method + } + + if (self.debug): + trace_back = traceback.extract_tb(e_stack_trace) + + # Format stacktrace + stack_trace = [] + for trace in trace_back: + stack_trace.append("File : {}, Line : {}, Func.Name : {}, Message : {}".format( + trace[0], trace[1], trace[2], trace[3])) + + response["stack_trace"] = stack_trace + + return response + + def generate(self, request: GenerateRequest) -> GenerateResponse: + try: + request.preprocess() + + request.max_new_tokens = get_num_tokens_to_generate( + request.max_new_tokens, self.allowed_max_new_tokens) + + response, total_time_taken = run_and_log_time( + partial(self.model.generate, request=request) + ) + + response.query_id = self.query_ids.generate_query_id + self.query_ids.generate_query_id += 1 + response.total_time_taken = "{:.2f} secs".format(total_time_taken) + + return response + except Exception: + response = self.get_exception_response( + self.query_ids.generate_query_id, request.method) + self.query_ids.generate_query_id += 1 + raise HTTPException(500, response) + + def tokenize(self, request: TokenizeRequest) -> TokenizeResponse: + try: + response, total_time_taken = run_and_log_time( + partial(self.model.tokenize, request=request) + ) + + response.query_id = self.query_ids.tokenize_query_id + self.query_ids.tokenize_query_id += 1 + response.total_time_taken = "{:.2f} msecs".format( + total_time_taken * 1000) + + return response + except Exception: + response = self.get_exception_response( + self.query_ids.tokenize_query_id, request.method) + self.query_ids.tokenize_query_id += 1 + raise HTTPException(500, response) + + def query_id(self) -> QueryID: + return self.query_ids + + def run(self): + run( + self.app, + host=self.host, + port=self.port, + workers=self.workers + ) + + def shutdown(self): + self.model.shutdown() + + +def main() -> None: + args = get_args() + server = Server(args) + try: + server.run() + except KeyboardInterrupt: + server.shutdown() + + +if (__name__ == "__main__"): + main() diff --git a/bloom-inference-server/utils/__init__.py b/bloom-inference-server/utils/__init__.py new file mode 100644 index 0000000..5b706d9 --- /dev/null +++ b/bloom-inference-server/utils/__init__.py @@ -0,0 +1,21 @@ +from .constants import BENCHMARK, CLI, DS_INFERENCE, DS_ZERO, HF_ACCELERATE, SERVER +from .model import Model, get_downloaded_model_path +from .requests import ( + GenerateRequest, + GenerateResponse, + TokenizeRequest, + TokenizeResponse, + get_filter_dict, + parse_generate_kwargs +) +from .utils import ( + get_args, + get_argument_parser, + get_dummy_batch, + get_num_tokens_to_generate, + get_str_dtype, + pad_ids, + print_rank_n, + run_and_log_time, + run_rank_n +) diff --git a/bloom-inference-server/utils/constants.py b/bloom-inference-server/utils/constants.py new file mode 100644 index 0000000..660f10c --- /dev/null +++ b/bloom-inference-server/utils/constants.py @@ -0,0 +1,79 @@ +BENCHMARK = "benchmark" +CLI = "cli" +SERVER = "server" + +HF_ACCELERATE = "hf_accelerate" +DS_INFERENCE = "ds_inference" +DS_ZERO = "ds_zero" + +BIGSCIENCE_BLOOM = "bigscience/bloom" +DS_INFERENCE_BLOOM_FP16 = "microsoft/bloom-deepspeed-inference-fp16" +DS_INFERENCE_BLOOM_INT8 = "microsoft/bloom-deepspeed-inference-int8" + +BF16 = "bf16" +FP16 = "fp16" +INT8 = "int8" + + +SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED = { + BENCHMARK: { + HF_ACCELERATE: { + BIGSCIENCE_BLOOM: { + BF16, + FP16, + INT8 + } + }, + DS_INFERENCE: { + BIGSCIENCE_BLOOM: { + FP16 + }, + DS_INFERENCE_BLOOM_FP16: { + FP16 + }, + DS_INFERENCE_BLOOM_INT8: { + INT8 + } + }, + DS_ZERO: { + BIGSCIENCE_BLOOM: { + BF16, + FP16 + } + } + }, + CLI: { + HF_ACCELERATE: { + BIGSCIENCE_BLOOM: { + BF16, + FP16, + # INT8 + } + }, + DS_INFERENCE: { + DS_INFERENCE_BLOOM_FP16: { + FP16 + }, + DS_INFERENCE_BLOOM_INT8: { + INT8 + } + } + }, + SERVER: { + HF_ACCELERATE: { + BIGSCIENCE_BLOOM: { + BF16, + FP16, + # INT8 + } + }, + DS_INFERENCE: { + DS_INFERENCE_BLOOM_FP16: { + FP16 + }, + DS_INFERENCE_BLOOM_INT8: { + INT8 + } + } + } +} diff --git a/bloom-inference-server/utils/model.py b/bloom-inference-server/utils/model.py new file mode 100644 index 0000000..5e36c9e --- /dev/null +++ b/bloom-inference-server/utils/model.py @@ -0,0 +1,106 @@ +import argparse +import os +from functools import partial + +import torch +from transformers.utils import is_offline_mode + +from huggingface_hub import snapshot_download + +from .requests import GenerateRequest, GenerateResponse, TokenizeRequest, TokenizeResponse +from .utils import print_rank_n, run_rank_n + + +class Model: + def __init__(self, args: argparse.Namespace) -> None: + self.tokenizer = None + self.pad = None + self.model = None + self.input_device = None + raise NotImplementedError("This is a dummy class") + + def generate(self, request: GenerateRequest) -> GenerateResponse: + input_tokens = self.tokenizer( + request.text, return_tensors="pt", padding=True) + + for t in input_tokens: + if torch.is_tensor(input_tokens[t]): + input_tokens[t] = input_tokens[t].to(self.input_device) + + with torch.no_grad(): + output = self.model.generate( + **input_tokens, + min_length=request.min_length, + do_sample=request.do_sample, + early_stopping=request.early_stopping, + num_beams=request.num_beams, + temperature=request.temperature, + top_k=request.top_k, + top_p=request.top_p, + typical_p=request.typical_p, + repetition_penalty=request.repetition_penalty, + bos_token_id=request.bos_token_id, + pad_token_id=request.pad_token_id, + eos_token_id=request.eos_token_id, + length_penalty=request.length_penalty, + no_repeat_ngram_size=request.no_repeat_ngram_size, + encoder_no_repeat_ngram_size=request.encoder_no_repeat_ngram_size, + num_return_sequences=request.num_return_sequences, + max_time=request.max_time, + max_new_tokens=request.max_new_tokens, + decoder_start_token_id=request.decoder_start_token_id, + num_beam_groups=request.num_beam_groups, + diversity_penalty=request.diversity_penalty, + forced_bos_token_id=request.forced_bos_token_id, + forced_eos_token_id=request.forced_eos_token_id, + exponential_decay_length_penalty=request.exponential_decay_length_penalty, + return_dict_in_generate=True + ) + + output_tokens = output.sequences + + input_token_lengths = [x.shape[0] for x in input_tokens.input_ids] + output_token_lengths = [x.shape[0] for x in output_tokens] + generated_tokens = [ + o - i for i, o in zip(input_token_lengths, output_token_lengths)] + + if (request.remove_input_from_output): + output_tokens = [x[-i:] + for x, i in zip(output_tokens, generated_tokens)] + + output_text = self.tokenizer.batch_decode( + output_tokens, skip_special_tokens=True) + + return GenerateResponse( + text=output_text, + num_generated_tokens=generated_tokens + ) + + def tokenize(self, request: TokenizeRequest) -> TokenizeResponse: + output = self.tokenizer( + request.text, + padding=request.padding + ) + return TokenizeResponse( + token_ids=output.input_ids, + attention_mask=output.attention_mask + ) + + def shutdown(self) -> None: + print_rank_n("shutting down") + exit() + + +def get_downloaded_model_path(model_name: str): + f = partial( + snapshot_download, + repo_id=model_name, + allow_patterns=["*"], + local_files_only=is_offline_mode(), + cache_dir=os.getenv("TRANSFORMERS_CACHE", None) + ) + # download only on 1 process + run_rank_n(f, barrier=True) + # now since the snapshot is downloaded, pass the + # model_path to all processes + return f() diff --git a/bloom-inference-server/utils/requests.py b/bloom-inference-server/utils/requests.py new file mode 100644 index 0000000..0bae3ee --- /dev/null +++ b/bloom-inference-server/utils/requests.py @@ -0,0 +1,129 @@ +from typing import Any, List + +from pydantic import BaseModel + + +class BaseResponse(BaseModel): + query_id: int = None + total_time_taken: str = None + + +class GenerateRequest(BaseModel): + text: List[str] = None + min_length: int = None + do_sample: bool = None + early_stopping: bool = None + num_beams: int = None + temperature: float = None + top_k: int = None + top_p: float = None + typical_p: float = None + repetition_penalty: float = None + bos_token_id: int = None + pad_token_id: int = None + eos_token_id: int = None + length_penalty: float = None + no_repeat_ngram_size: int = None + encoder_no_repeat_ngram_size: int = None + num_return_sequences: int = None + max_time: float = None + max_new_tokens: int = None + decoder_start_token_id: int = None + num_beam_groups: int = None + diversity_penalty: float = None + forced_bos_token_id: int = None + forced_eos_token_id: int = None + exponential_decay_length_penalty: float = None + remove_input_from_output: bool = False + method: str = "generate" + + def preprocess(self) -> None: + if (self.temperature == 0): + self.do_sample = False + + +class GenerateResponse(BaseResponse): + text: List[str] = None + num_generated_tokens: List[int] = None + method: str = "generate" + + +class TokenizeRequest(BaseModel): + text: List[str] = None + padding: bool = False + method: str = "tokenize" + + +class TokenizeResponse(BaseResponse): + token_ids: List[List[int]] = None + attention_mask: List[List[int]] = None + method: str = "tokenize" + + +def parse_bool(value: str) -> bool: + if (value.lower() == "true"): + return True + elif (value.lower() == "false"): + return False + else: + raise ValueError("{} is not a valid boolean value".format(value)) + + +def parse_field(kwargs: dict, + field: str, + dtype: int, + default_value: Any = None) -> Any: + if (field in kwargs): + if (type(kwargs[field]) == dtype): + return kwargs[field] + elif (dtype == bool): + return parse_bool(kwargs[field]) + else: + return dtype(kwargs[field]) + else: + return default_value + + +def parse_generate_kwargs(text: List[str], kwargs: dict) -> GenerateRequest: + return GenerateRequest( + text=text, + min_length=parse_field(kwargs, "min_length", int), + do_sample=parse_field(kwargs, "do_sample", bool), + early_stopping=parse_field(kwargs, "early_stopping", bool), + num_beams=parse_field(kwargs, "num_beams", int), + temperature=parse_field(kwargs, "temperature", float), + top_k=parse_field(kwargs, "top_k", int), + top_p=parse_field(kwargs, "top_p", float), + typical_p=parse_field(kwargs, "typical_p", float), + repetition_penalty=parse_field(kwargs, "repetition_penalty", float), + bos_token_id=parse_field(kwargs, "bos_token_id", int), + pad_token_id=parse_field(kwargs, "pad_token_id", int), + eos_token_id=parse_field(kwargs, "eos_token_id", int), + length_penalty=parse_field(kwargs, "length_penalty", float), + no_repeat_ngram_size=parse_field(kwargs, "no_repeat_ngram_size", int), + encoder_no_repeat_ngram_size=parse_field( + kwargs, "encoder_no_repeat_ngram_size", int), + num_return_sequences=parse_field(kwargs, "num_return_sequences", int), + max_time=parse_field(kwargs, "max_time", float), + max_new_tokens=parse_field(kwargs, "max_new_tokens", int), + decoder_start_token_id=parse_field( + kwargs, "decoder_start_token_id", int), + num_beam_group=parse_field(kwargs, "num_beam_group", int), + diversity_penalty=parse_field(kwargs, "diversity_penalty", float), + forced_bos_token_id=parse_field(kwargs, "forced_bos_token_id", int), + forced_eos_token_id=parse_field(kwargs, "forced_eos_token_id", int), + exponential_decay_length_penalty=parse_field( + kwargs, "exponential_decay_length_penalty", float), + remove_input_from_output=parse_field( + kwargs, "remove_input_from_output", bool, False) + ) + + +def get_filter_dict(d: BaseModel) -> dict: + d = dict(d) + q = {} + for i in d: + if (d[i] != None): + q[i] = d[i] + del q["text"] + return q diff --git a/bloom-inference-server/utils/utils.py b/bloom-inference-server/utils/utils.py new file mode 100644 index 0000000..f15781f --- /dev/null +++ b/bloom-inference-server/utils/utils.py @@ -0,0 +1,194 @@ +import argparse +import copy +import json +import math +import time +from functools import partial +from typing import Any, List, Tuple, Union + +import torch +import torch.distributed as dist + +from .constants import ( + BIGSCIENCE_BLOOM, + DS_INFERENCE, + DS_INFERENCE_BLOOM_FP16, + DS_INFERENCE_BLOOM_INT8, + DS_ZERO, + HF_ACCELERATE, + SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED +) + + +dummy_input_sentences = [ + "DeepSpeed is a machine learning framework", + "He is working on", + "He has a", + "He got all", + "Everyone is happy and I can", + "The new movie that got Oscar this year", + "In the far far distance from our galaxy,", + "Peace is the only way" +] + + +def get_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + + group = parser.add_argument_group(title="model") + group.add_argument( + "--deployment_framework", + type=str, + choices=[ + HF_ACCELERATE, + DS_INFERENCE, + DS_ZERO + ], + default=HF_ACCELERATE + ) + group.add_argument( + "--model_name", + type=str, + required=True, + choices=[ + BIGSCIENCE_BLOOM, + DS_INFERENCE_BLOOM_FP16, + DS_INFERENCE_BLOOM_INT8 + ], + help="model to use" + ) + group.add_argument("--dtype", type=str, required=True, + choices=["bf16", "fp16", "int8"], help="dtype for model") + group.add_argument( + "--generate_kwargs", + type=str, + default='{"min_length": 100, "max_new_tokens": 100, "do_sample": false}', + help="generate parameters. look at https://huggingface.co/docs/transformers/v4.21.1/en/main_classes/text_generation#transformers.generation_utils.GenerationMixin.generate to see the supported parameters" + ) + + return parser + + +def get_args(parser: argparse.ArgumentParser, script: str) -> argparse.Namespace: + args = parser.parse_args() + + assert is_script_framework_model_dtype_allowed( + script, + args.deployment_framework, + args.model_name, + args.dtype + ), f"{script} is not supported with {args.deployment_framework}, {args.model_name} and {args.dtype} dtype" + + args.dtype = get_torch_dtype(args.dtype) + args.generate_kwargs = json.loads(args.generate_kwargs) + args.use_pre_sharded_checkpoints = args.model_name in [ + DS_INFERENCE_BLOOM_FP16, DS_INFERENCE_BLOOM_INT8] + return args + + +def run_rank_n(func: partial, + barrier: bool = False, + rank: int = 0, + other_rank_output: Any = None) -> Any: + if (dist.is_initialized()): + if (dist.get_rank() == rank): + output = func() + if (barrier): + dist.barrier() + return output + else: + if (barrier): + dist.barrier() + return other_rank_output + else: + return func() + + +def print_rank_n(*values, rank: int = 0) -> None: + if (dist.is_initialized()): + if (dist.get_rank() == rank): + print(*values) + else: + print(*values) + + +def get_dtype_from_model_name(model_name: str) -> str: + if (model_name == BIGSCIENCE_BLOOM): + return "bf16" + elif (model_name == DS_INFERENCE_BLOOM_FP16): + return "fp16" + elif (model_name == DS_INFERENCE_BLOOM_INT8): + return "int8" + + +def get_torch_dtype(dtype_str: str) -> torch.dtype: + if (dtype_str == "bf16"): + return torch.bfloat16 + elif (dtype_str == "fp16"): + return torch.float16 + elif (dtype_str == "int8"): + return torch.int8 + + +def get_str_dtype(dtype_str: str) -> torch.dtype: + if (dtype_str == torch.bfloat16): + return "bf16" + elif (dtype_str == torch.float16): + return "fp16" + elif (dtype_str == torch.int8): + return "int8" + + +def get_dummy_batch(batch_size: int, input_sentences: List[str] = None) -> List[str]: + if (input_sentences == None): + input_sentences = copy.deepcopy(dummy_input_sentences) + + if (batch_size > len(input_sentences)): + input_sentences *= math.ceil(batch_size / len(input_sentences)) + input_sentences = input_sentences[:batch_size] + + return input_sentences + + +def get_num_tokens_to_generate(max_new_tokens: int, + allowed_max_new_tokens: int) -> int: + if (max_new_tokens == None): + return allowed_max_new_tokens + else: + return min(max_new_tokens, allowed_max_new_tokens) + + +def run_and_log_time(execs: Union[List[partial], partial]) -> Tuple[Union[List[Any], Any], float]: + start_time = time.time() + + if (type(execs) == list): + results = [] + for f in execs: + results.append(f()) + else: + results = execs() + + time_elapsed = time.time() - start_time + return results, time_elapsed + + +def pad_ids(arrays, padding, max_length=-1): + if (max_length < 0): + max_length = max(list(map(len, arrays))) + + arrays = [[padding] * (max_length - len(array)) + + array for array in arrays] + + return arrays + + +def is_script_framework_model_dtype_allowed(script: str, + deployment_framework: str, + model_name: str, + dtype: str) -> bool: + if (script in SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED): + if (deployment_framework in SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED[script]): + if (model_name in SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED[script][deployment_framework]): + if (dtype in SCRIPT_FRAMEWORK_MODEL_DTYPE_ALLOWED[script][deployment_framework][model_name]): + return True + return False