From 6e29f4c7257c37b483d88291b03300ad8c776dad Mon Sep 17 00:00:00 2001 From: ngxson Date: Sat, 20 Jan 2024 00:25:20 +0100 Subject: [PATCH 1/7] server: add llama_server_queue struct --- Makefile | 2 +- examples/server/CMakeLists.txt | 2 +- examples/server/server.cpp | 367 +++++++-------------------------- examples/server/utils.hpp | 296 ++++++++++++++++++++++++++ 4 files changed, 372 insertions(+), 295 deletions(-) create mode 100644 examples/server/utils.hpp diff --git a/Makefile b/Makefile index a8658a596eee0..f7f24ba9ec987 100644 --- a/Makefile +++ b/Makefile @@ -619,7 +619,7 @@ embedding: examples/embedding/embedding.cpp ggml.o llama.o $(C save-load-state: examples/save-load-state/save-load-state.cpp ggml.o llama.o $(COMMON_DEPS) $(OBJS) $(CXX) $(CXXFLAGS) $(filter-out %.h,$^) -o $@ $(LDFLAGS) -server: examples/server/server.cpp examples/server/httplib.h examples/server/json.hpp examples/server/index.html.hpp examples/server/index.js.hpp examples/server/completion.js.hpp examples/llava/clip.cpp examples/llava/clip.h common/stb_image.h ggml.o llama.o $(COMMON_DEPS) grammar-parser.o $(OBJS) +server: examples/server/server.cpp examples/server/utils.hpp examples/server/httplib.h examples/server/json.hpp examples/server/index.html.hpp examples/server/index.js.hpp examples/server/completion.js.hpp examples/llava/clip.cpp examples/llava/clip.h common/stb_image.h ggml.o llama.o $(COMMON_DEPS) grammar-parser.o $(OBJS) $(CXX) $(CXXFLAGS) -Iexamples/server $(filter-out %.h,$(filter-out %.hpp,$^)) -o $@ $(LDFLAGS) $(LWINSOCK2) -Wno-cast-qual gguf: examples/gguf/gguf.cpp ggml.o $(OBJS) diff --git a/examples/server/CMakeLists.txt b/examples/server/CMakeLists.txt index 81709e4484c9f..b3772081f2526 100644 --- a/examples/server/CMakeLists.txt +++ b/examples/server/CMakeLists.txt @@ -1,7 +1,7 @@ set(TARGET server) option(LLAMA_SERVER_VERBOSE "Build verbose logging option for Server" ON) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -add_executable(${TARGET} server.cpp json.hpp httplib.h) +add_executable(${TARGET} server.cpp utils.hpp json.hpp httplib.h) install(TARGETS ${TARGET} RUNTIME) target_compile_definitions(${TARGET} PRIVATE SERVER_VERBOSE=$ diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 0462fbd24739b..c79ef7915d5ed 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1,6 +1,7 @@ #include "common.h" #include "llama.h" #include "grammar-parser.h" +#include "utils.hpp" #include "../llava/clip.h" @@ -28,10 +29,6 @@ #include #include -#ifndef SERVER_VERBOSE -#define SERVER_VERBOSE 1 -#endif - #define DEFAULT_OAICOMPAT_MODEL "gpt-3.5-turbo-0613" using json = nlohmann::json; @@ -48,196 +45,9 @@ struct server_params static bool server_verbose = false; -#if SERVER_VERBOSE != 1 -#define LOG_VERBOSE(MSG, ...) -#else -#define LOG_VERBOSE(MSG, ...) \ - do \ - { \ - if (server_verbose) \ - { \ - server_log("VERBOSE", __func__, __LINE__, MSG, __VA_ARGS__); \ - } \ - } while (0) -#endif - -#define LOG_ERROR( MSG, ...) server_log("ERROR", __func__, __LINE__, MSG, __VA_ARGS__) -#define LOG_WARNING(MSG, ...) server_log("WARNING", __func__, __LINE__, MSG, __VA_ARGS__) -#define LOG_INFO( MSG, ...) server_log("INFO", __func__, __LINE__, MSG, __VA_ARGS__) - json oaicompat_completion_params_parse(const json &body); std::string format_chatml(std::vector messages); - -// -// base64 utils (TODO: move to common in the future) -// - -static const std::string base64_chars = - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz" - "0123456789+/"; - -static inline bool is_base64(uint8_t c) -{ - return (isalnum(c) || (c == '+') || (c == '/')); -} - -static std::vector base64_decode(const std::string & encoded_string) -{ - int i = 0; - int j = 0; - int in_ = 0; - - int in_len = encoded_string.size(); - - uint8_t char_array_4[4]; - uint8_t char_array_3[3]; - - std::vector ret; - - while (in_len-- && (encoded_string[in_] != '=') && is_base64(encoded_string[in_])) - { - char_array_4[i++] = encoded_string[in_]; in_++; - if (i == 4) - { - for (i = 0; i <4; i++) - { - char_array_4[i] = base64_chars.find(char_array_4[i]); - } - - char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); - char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); - char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; - - for (i = 0; (i < 3); i++) - { - ret.push_back(char_array_3[i]); - } - i = 0; - } - } - - if (i) - { - for (j = i; j <4; j++) - { - char_array_4[j] = 0; - } - - for (j = 0; j <4; j++) - { - char_array_4[j] = base64_chars.find(char_array_4[j]); - } - - char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); - char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); - char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; - - for (j = 0; (j < i - 1); j++) - { - ret.push_back(char_array_3[j]); - } - } - - return ret; -} - -// -// parallel -// - -enum server_state { - SERVER_STATE_LOADING_MODEL, // Server is starting up, model not fully loaded yet - SERVER_STATE_READY, // Server is ready and model is loaded - SERVER_STATE_ERROR // An error occurred, load_model failed -}; - -enum task_type { - TASK_TYPE_COMPLETION, - TASK_TYPE_CANCEL, -}; - -struct task_server { - int id; - int target_id; - task_type type; - json data; - bool infill_mode = false; - bool embedding_mode = false; - int multitask_id = -1; -}; - -struct task_result { - int id; - int multitask_id = -1; - bool stop; - bool error; - json result_json; -}; - -struct task_multi { - int id; - std::set subtasks_remaining{}; - std::vector results{}; -}; - -// TODO: can become bool if we can't find use of more states -enum slot_state -{ - IDLE, - PROCESSING, -}; - -enum slot_command -{ - NONE, - LOAD_PROMPT, - RELEASE, -}; - -struct slot_params -{ - bool stream = true; - bool cache_prompt = false; // remember the prompt to avoid reprocessing all prompt - - uint32_t seed = -1; // RNG seed - int32_t n_keep = 0; // number of tokens to keep from initial prompt - int32_t n_predict = -1; // new tokens to predict - - std::vector antiprompt; - - json input_prefix; - json input_suffix; -}; - -struct slot_image -{ - int32_t id; - - bool request_encode_image = false; - float * image_embedding = nullptr; - int32_t image_tokens = 0; - - clip_image_u8 * img_data; - - std::string prefix_prompt; // before of this image -}; - -// completion token output with probabilities -struct completion_token_output -{ - struct token_prob - { - llama_token tok; - float prob; - }; - - std::vector probs; - llama_token tok; - std::string text_to_send; -}; - static size_t common_part(const std::vector &a, const std::vector &b) { size_t i; @@ -292,28 +102,6 @@ static std::string tokens_to_str(llama_context *ctx, Iter begin, Iter end) return ret; } -static void server_log(const char *level, const char *function, int line, - const char *message, const nlohmann::ordered_json &extra) -{ - nlohmann::ordered_json log - { - {"timestamp", time(nullptr)}, - {"level", level}, - {"function", function}, - {"line", line}, - {"message", message}, - }; - - if (!extra.empty()) - { - log.merge_patch(extra); - } - - const std::string str = log.dump(-1, ' ', false, json::error_handler_t::replace); - printf("%.*s\n", (int)str.size(), str.data()); - fflush(stdout); -} - // format incomplete utf-8 multibyte character for output static std::string tokens_to_output_formatted_string(const llama_context *ctx, const llama_token token) { @@ -539,7 +327,6 @@ struct llama_server_context bool all_slots_are_idle = false; bool add_bos_token = true; - int32_t id_gen; int32_t n_ctx; // total context for all clients / slots // system prompt @@ -554,11 +341,11 @@ struct llama_server_context // slots / clients std::vector slots; - std::vector queue_tasks; + llama_server_queue queue_tasks; std::vector queue_results; std::vector queue_multitasks; std::mutex mutex_tasks; // also guards id_gen, and queue_multitasks - std::condition_variable condition_tasks; + // std::condition_variable condition_tasks; std::mutex mutex_results; std::condition_variable condition_results; @@ -619,8 +406,6 @@ struct llama_server_context } void initialize() { - id_gen = 0; - // create slots all_slots_are_idle = true; @@ -1201,7 +986,7 @@ struct llama_server_context multi.id = id; std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); queue_multitasks.push_back(multi); - condition_tasks.notify_one(); + // TODO @ngxson : Do we need to notify the queue_tasks? } void update_multi_task(int multitask_id, int subtask_id, task_result& result) @@ -1213,7 +998,7 @@ struct llama_server_context { multitask.subtasks_remaining.erase(subtask_id); multitask.results.push_back(result); - condition_tasks.notify_one(); + // TODO @ngxson : Do we need to notify the queue_tasks? } } } @@ -1401,7 +1186,6 @@ struct llama_server_context { std::unique_lock lock(mutex_tasks); task_server task; - task.id = id_gen++; task.target_id = 0; task.data = std::move(data); task.infill_mode = infill; @@ -1417,13 +1201,12 @@ struct llama_server_context } // otherwise, it's a single-prompt task, we actually queue it - queue_tasks.push_back(task); - condition_tasks.notify_one(); - return task.id; + return queue_tasks.post(task); } task_result next_result(int task_id) { + LOG_TEE("next_result %i \n", task_id); while (true) { std::unique_lock lock(mutex_results); @@ -1525,13 +1308,10 @@ struct llama_server_context void request_cancel(int task_id) { - std::unique_lock lock(mutex_tasks); task_server task; - task.id = id_gen++; task.type = TASK_TYPE_CANCEL; task.target_id = task_id; - queue_tasks.push_back(task); - condition_tasks.notify_one(); + queue_tasks.post(task); } int split_multiprompt_task(task_server& multiprompt_task) @@ -1539,7 +1319,7 @@ struct llama_server_context int prompt_count = multiprompt_task.data.at("prompt").size(); assert(prompt_count > 1); - int multitask_id = id_gen++; + int multitask_id = queue_tasks.get_next_id(); std::vector subtask_ids(prompt_count); for (int i = 0; i < prompt_count; i++) { @@ -1555,73 +1335,67 @@ struct llama_server_context return multitask_id; } - void process_tasks() + void process_single_task(task_server task) { - std::unique_lock lock(mutex_tasks); - std::vector deferred_tasks; - while (!queue_tasks.empty()) + switch (task.type) { - task_server task = queue_tasks.front(); - queue_tasks.erase(queue_tasks.begin()); - switch (task.type) - { - case TASK_TYPE_COMPLETION: { - llama_client_slot *slot = get_slot(json_value(task.data, "slot_id", -1)); - if (slot == nullptr) - { - // if no slot is available, we defer this task for processing later - deferred_tasks.push_back(task); + case TASK_TYPE_COMPLETION: { + llama_client_slot *slot = get_slot(json_value(task.data, "slot_id", -1)); + if (slot == nullptr) + { + // if no slot is available, we defer this task for processing later + // deferred_tasks.push_back(task); + LOG_INFO("no slot", {}); + break; + } + + if (task.data.contains("system_prompt")) + { + if (!all_slots_are_idle) { + send_error(task, "system prompt can only be updated when all slots are idle"); break; } + process_system_prompt_data(task.data["system_prompt"]); - if (task.data.contains("system_prompt")) + // reset cache_tokens for all slots + for (llama_client_slot &slot : slots) { - if (!all_slots_are_idle) { - send_error(task, "system prompt can only be updated when all slots are idle"); - break; - } - process_system_prompt_data(task.data["system_prompt"]); - - // reset cache_tokens for all slots - for (llama_client_slot &slot : slots) - { - slot.cache_tokens.clear(); - } + slot.cache_tokens.clear(); } + } - slot->reset(); + slot->reset(); - slot->infill = task.infill_mode; - slot->embedding = task.embedding_mode; - slot->task_id = task.id; - slot->multitask_id = task.multitask_id; + slot->infill = task.infill_mode; + slot->embedding = task.embedding_mode; + slot->task_id = task.id; + slot->multitask_id = task.multitask_id; - if (!launch_slot_with_data(slot, task.data)) + if (!launch_slot_with_data(slot, task.data)) + { + // send error result + send_error(task, "internal_error"); + break; + } + } break; + case TASK_TYPE_CANCEL: { // release slot linked with the task id + for (auto & slot : slots) + { + if (slot.task_id == task.target_id) { - // send error result - send_error(task, "internal_error"); + slot.release(); break; } - } break; - case TASK_TYPE_CANCEL: { // release slot linked with the task id - for (auto & slot : slots) - { - if (slot.task_id == task.target_id) - { - slot.release(); - break; - } - } - } break; - } - } - - // add all the deferred tasks back the the queue - for (task_server &task : deferred_tasks) - { - queue_tasks.push_back(task); + } + } break; + case TASK_TYPE_NEXT_RESPONSE: { + // do nothing + } break; } + } + void process_multitask() + { // remove finished multitasks from the queue of multitasks, and add the corresponding result to the result queue std::vector agg_results; auto queue_iterator = queue_multitasks.begin(); @@ -1657,18 +1431,12 @@ struct llama_server_context } } - // done with tasks, unlock - lock.unlock(); - // copy aggregate results of complete multi-tasks to the results queue std::lock_guard lock_results(mutex_results); queue_results.insert(queue_results.end(), agg_results.begin(), agg_results.end()); } bool update_slots() { - // attend tasks - process_tasks(); - if (system_need_update) { LOG_TEE("updating system prompt\n"); @@ -1684,10 +1452,12 @@ struct llama_server_context LOG_TEE("all slots are idle and system prompt is empty, clear the KV cache\n"); kv_cache_clear(); } - std::unique_lock lock(mutex_tasks); - condition_tasks.wait(lock, [&]{ - return !queue_tasks.empty(); - }); + return true; + } else { + task_server task; + task.type = TASK_TYPE_NEXT_RESPONSE; + task.target_id = -1; + queue_tasks.post(task); } for (llama_client_slot &slot : slots) @@ -1997,6 +1767,11 @@ struct llama_server_context } return true; } + + void run_on_all_tasks_finished() { + process_multitask(); + update_slots(); + } }; static void server_print_usage(const char *argv0, const gpt_params ¶ms, @@ -3360,15 +3135,21 @@ int main(int argc, char **argv) // GG: if I put the main loop inside a thread, it crashes on the first request when build in Debug!? // "Bus error: 10" - this is on macOS, it does not crash on Linux //std::thread t2([&]() - { + /*{ bool running = true; while (running) { running = llama.update_slots(); } - } + }*/ //); + llama.queue_tasks.on_new_task(std::bind( + &llama_server_context::process_single_task, &llama, std::placeholders::_1)); + llama.queue_tasks.on_all_tasks_finished(std::bind( + &llama_server_context::run_on_all_tasks_finished, &llama)); + llama.queue_tasks.start_loop(); + t.join(); llama_backend_free(); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp new file mode 100644 index 0000000000000..cf1925dde8e52 --- /dev/null +++ b/examples/server/utils.hpp @@ -0,0 +1,296 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "json.hpp" + +#include "../llava/clip.h" + +using json = nlohmann::json; + +#ifndef SERVER_VERBOSE +#define SERVER_VERBOSE 1 +#endif + +#if SERVER_VERBOSE != 1 +#define LOG_VERBOSE(MSG, ...) +#else +#define LOG_VERBOSE(MSG, ...) \ + do \ + { \ + if (server_verbose) \ + { \ + server_log("VERBOSE", __func__, __LINE__, MSG, __VA_ARGS__); \ + } \ + } while (0) +#endif + +#define LOG_ERROR( MSG, ...) server_log("ERROR", __func__, __LINE__, MSG, __VA_ARGS__) +#define LOG_WARNING(MSG, ...) server_log("WARNING", __func__, __LINE__, MSG, __VA_ARGS__) +#define LOG_INFO( MSG, ...) server_log("INFO", __func__, __LINE__, MSG, __VA_ARGS__) + +// +// parallel +// + +enum server_state { + SERVER_STATE_LOADING_MODEL, // Server is starting up, model not fully loaded yet + SERVER_STATE_READY, // Server is ready and model is loaded + SERVER_STATE_ERROR // An error occurred, load_model failed +}; + +enum task_type { + TASK_TYPE_COMPLETION, + TASK_TYPE_CANCEL, + TASK_TYPE_NEXT_RESPONSE +}; + +struct task_server { + int id = -1; // to be filled by llama_server_queue + int target_id; + task_type type; + json data; + bool infill_mode = false; + bool embedding_mode = false; + int multitask_id = -1; +}; + +struct task_result { + int id; + int multitask_id = -1; + bool stop; + bool error; + json result_json; +}; + +struct task_multi { + int id; + std::set subtasks_remaining{}; + std::vector results{}; +}; + +// TODO: can become bool if we can't find use of more states +enum slot_state +{ + IDLE, + PROCESSING, +}; + +enum slot_command +{ + NONE, + LOAD_PROMPT, + RELEASE, +}; + +struct slot_params +{ + bool stream = true; + bool cache_prompt = false; // remember the prompt to avoid reprocessing all prompt + + uint32_t seed = -1; // RNG seed + int32_t n_keep = 0; // number of tokens to keep from initial prompt + int32_t n_predict = -1; // new tokens to predict + + std::vector antiprompt; + + json input_prefix; + json input_suffix; +}; + +struct slot_image +{ + int32_t id; + + bool request_encode_image = false; + float * image_embedding = nullptr; + int32_t image_tokens = 0; + + clip_image_u8 * img_data; + + std::string prefix_prompt; // before of this image +}; + +// completion token output with probabilities +struct completion_token_output +{ + struct token_prob + { + llama_token tok; + float prob; + }; + + std::vector probs; + llama_token tok; + std::string text_to_send; +}; + +static inline void server_log(const char *level, const char *function, int line, + const char *message, const nlohmann::ordered_json &extra) +{ + nlohmann::ordered_json log + { + {"timestamp", time(nullptr)}, + {"level", level}, + {"function", function}, + {"line", line}, + {"message", message}, + }; + + if (!extra.empty()) + { + log.merge_patch(extra); + } + + const std::string str = log.dump(-1, ' ', false, json::error_handler_t::replace); + printf("%.*s\n", (int)str.size(), str.data()); + fflush(stdout); +} + +// +// work queue utils +// + +template +struct llama_server_queue { + int id = 0; + std::mutex mutex_tasks; + std::vector queue_tasks; + std::condition_variable condition_tasks; + std::function callback_new_task; + std::function callback_all_task_finished; + + int post(T task) { + LOG_INFO("post", {}); + std::unique_lock lock(mutex_tasks); + task.id = id++; + queue_tasks.push_back(std::move(task)); + condition_tasks.notify_one(); + return task.id; + } + + int get_next_id() { + std::unique_lock lock(mutex_tasks); + return id++; + } + + void on_new_task(std::function callback) { + callback_new_task = callback; + } + + void on_all_tasks_finished(std::function callback) { + callback_all_task_finished = callback; + } + + void start_loop() { + while (true) { + // new task arrived + LOG_INFO("have new task", {}); + { + while (true) + { + std::unique_lock lock(mutex_tasks); + if (queue_tasks.empty()) { + lock.unlock(); + break; + } + task_server task = queue_tasks.front(); + queue_tasks.erase(queue_tasks.begin()); + lock.unlock(); + LOG_INFO("callback_new_task", {}); + callback_new_task(task); + } + LOG_INFO("callback_all_task_finished", {}); + callback_all_task_finished(); + } + LOG_INFO("wait for new task", {}); + // wait for new task + { + std::unique_lock lock(mutex_tasks); + if (queue_tasks.empty()) { + condition_tasks.wait(lock, [&]{ + return !queue_tasks.empty(); + }); + } + } + } + } +}; + +// +// base64 utils (TODO: move to common in the future) +// + +static const std::string base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + +static inline bool is_base64(uint8_t c) +{ + return (isalnum(c) || (c == '+') || (c == '/')); +} + +static inline std::vector base64_decode(const std::string & encoded_string) +{ + int i = 0; + int j = 0; + int in_ = 0; + + int in_len = encoded_string.size(); + + uint8_t char_array_4[4]; + uint8_t char_array_3[3]; + + std::vector ret; + + while (in_len-- && (encoded_string[in_] != '=') && is_base64(encoded_string[in_])) + { + char_array_4[i++] = encoded_string[in_]; in_++; + if (i == 4) + { + for (i = 0; i <4; i++) + { + char_array_4[i] = base64_chars.find(char_array_4[i]); + } + + char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); + char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); + char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; + + for (i = 0; (i < 3); i++) + { + ret.push_back(char_array_3[i]); + } + i = 0; + } + } + + if (i) + { + for (j = i; j <4; j++) + { + char_array_4[j] = 0; + } + + for (j = 0; j <4; j++) + { + char_array_4[j] = base64_chars.find(char_array_4[j]); + } + + char_array_3[0] = ((char_array_4[0] ) << 2) + ((char_array_4[1] & 0x30) >> 4); + char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2); + char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3]; + + for (j = 0; (j < i - 1); j++) + { + ret.push_back(char_array_3[j]); + } + } + + return ret; +} \ No newline at end of file From 12829b2e64afdf113f627064a6d57d3f64b31e19 Mon Sep 17 00:00:00 2001 From: ngxson Date: Sun, 21 Jan 2024 14:45:28 +0100 Subject: [PATCH 2/7] server: add llama_server_response_event --- Makefile | 2 +- examples/server/CMakeLists.txt | 2 +- examples/server/oai.hpp | 14 ++++ examples/server/server.cpp | 125 ++++++++++++--------------------- examples/server/utils.hpp | 104 +++++++++++++++++++++++++-- 5 files changed, 161 insertions(+), 86 deletions(-) create mode 100644 examples/server/oai.hpp diff --git a/Makefile b/Makefile index f7f24ba9ec987..f9a933b2584d7 100644 --- a/Makefile +++ b/Makefile @@ -619,7 +619,7 @@ embedding: examples/embedding/embedding.cpp ggml.o llama.o $(C save-load-state: examples/save-load-state/save-load-state.cpp ggml.o llama.o $(COMMON_DEPS) $(OBJS) $(CXX) $(CXXFLAGS) $(filter-out %.h,$^) -o $@ $(LDFLAGS) -server: examples/server/server.cpp examples/server/utils.hpp examples/server/httplib.h examples/server/json.hpp examples/server/index.html.hpp examples/server/index.js.hpp examples/server/completion.js.hpp examples/llava/clip.cpp examples/llava/clip.h common/stb_image.h ggml.o llama.o $(COMMON_DEPS) grammar-parser.o $(OBJS) +server: examples/server/server.cpp examples/server/oai.hpp examples/server/utils.hpp examples/server/httplib.h examples/server/json.hpp examples/server/index.html.hpp examples/server/index.js.hpp examples/server/completion.js.hpp examples/llava/clip.cpp examples/llava/clip.h common/stb_image.h ggml.o llama.o $(COMMON_DEPS) grammar-parser.o $(OBJS) $(CXX) $(CXXFLAGS) -Iexamples/server $(filter-out %.h,$(filter-out %.hpp,$^)) -o $@ $(LDFLAGS) $(LWINSOCK2) -Wno-cast-qual gguf: examples/gguf/gguf.cpp ggml.o $(OBJS) diff --git a/examples/server/CMakeLists.txt b/examples/server/CMakeLists.txt index b3772081f2526..cc13b2d630652 100644 --- a/examples/server/CMakeLists.txt +++ b/examples/server/CMakeLists.txt @@ -1,7 +1,7 @@ set(TARGET server) option(LLAMA_SERVER_VERBOSE "Build verbose logging option for Server" ON) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -add_executable(${TARGET} server.cpp utils.hpp json.hpp httplib.h) +add_executable(${TARGET} server.cpp oai.hpp utils.hpp json.hpp httplib.h) install(TARGETS ${TARGET} RUNTIME) target_compile_definitions(${TARGET} PRIVATE SERVER_VERBOSE=$ diff --git a/examples/server/oai.hpp b/examples/server/oai.hpp new file mode 100644 index 0000000000000..b06b7efb42265 --- /dev/null +++ b/examples/server/oai.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "json.hpp" +#include "utils.hpp" + +using json = nlohmann::json; + diff --git a/examples/server/server.cpp b/examples/server/server.cpp index c79ef7915d5ed..a853d4a693311 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -43,7 +43,7 @@ struct server_params int32_t write_timeout = 600; }; -static bool server_verbose = false; +bool server_verbose = false; json oaicompat_completion_params_parse(const json &body); std::string format_chatml(std::vector messages); @@ -279,7 +279,7 @@ struct llama_client_slot } void release() { - if (state == IDLE || state == PROCESSING) + if (state == PROCESSING) { t_token_generation = (ggml_time_us() - t_start_genereration) / 1e3; command = RELEASE; @@ -342,12 +342,9 @@ struct llama_server_context std::vector slots; llama_server_queue queue_tasks; - std::vector queue_results; + llama_server_response_event queue_results; std::vector queue_multitasks; - std::mutex mutex_tasks; // also guards id_gen, and queue_multitasks - // std::condition_variable condition_tasks; - std::mutex mutex_results; - std::condition_variable condition_results; + std::mutex mutex_multitasks; ~llama_server_context() { @@ -968,20 +965,18 @@ struct llama_server_context void send_error(task_server& task, const std::string &error) { LOG_TEE("task %i - error: %s\n", task.id, error.c_str()); - std::unique_lock lock(mutex_results); task_result res; res.id = task.id; res.multitask_id = task.multitask_id; res.stop = false; res.error = true; res.result_json = { { "content", error } }; - queue_results.push_back(res); - condition_results.notify_all(); + queue_results.send(res); } - void add_multi_task(int id, std::vector& sub_ids) + void add_multitask(int id, std::vector& sub_ids) { - std::lock_guard lock(mutex_tasks); + std::lock_guard lock(mutex_multitasks); task_multi multi; multi.id = id; std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); @@ -989,9 +984,9 @@ struct llama_server_context // TODO @ngxson : Do we need to notify the queue_tasks? } - void update_multi_task(int multitask_id, int subtask_id, task_result& result) + void update_multitask(int multitask_id, int subtask_id, task_result& result) { - std::lock_guard lock(mutex_tasks); + std::lock_guard lock(mutex_multitasks); for (auto& multitask : queue_multitasks) { if (multitask.id == multitask_id) @@ -1046,7 +1041,6 @@ struct llama_server_context void send_partial_response(llama_client_slot &slot, completion_token_output tkn) { - std::unique_lock lock(mutex_results); task_result res; res.id = slot.task_id; res.multitask_id = slot.multitask_id; @@ -1081,13 +1075,11 @@ struct llama_server_context res.result_json["model"] = slot.oaicompat_model; } - queue_results.push_back(res); - condition_results.notify_all(); + queue_results.send(res); } void send_final_response(llama_client_slot &slot) { - std::unique_lock lock(mutex_results); task_result res; res.id = slot.task_id; res.multitask_id = slot.multitask_id; @@ -1136,22 +1128,17 @@ struct llama_server_context res.result_json["model"] = slot.oaicompat_model; } - queue_results.push_back(res); - condition_results.notify_all(); - - // done with results, unlock - lock.unlock(); + queue_results.send(res); // parent multitask, if any, needs to be updated if (slot.multitask_id != -1) { - update_multi_task(slot.multitask_id, slot.task_id, res); + update_multitask(slot.multitask_id, slot.task_id, res); } } void send_embedding(llama_client_slot &slot) { - std::unique_lock lock(mutex_results); task_result res; res.id = slot.task_id; res.multitask_id = slot.multitask_id; @@ -1178,13 +1165,12 @@ struct llama_server_context {"embedding", embedding }, }; } - queue_results.push_back(res); - condition_results.notify_all(); + queue_results.send(res); } int request_completion(json data, bool infill, bool embedding, int multitask_id) { - std::unique_lock lock(mutex_tasks); + std::unique_lock lock(mutex_multitasks); task_server task; task.target_id = 0; task.data = std::move(data); @@ -1204,40 +1190,6 @@ struct llama_server_context return queue_tasks.post(task); } - task_result next_result(int task_id) - { - LOG_TEE("next_result %i \n", task_id); - while (true) - { - std::unique_lock lock(mutex_results); - condition_results.wait(lock, [&]{ - return !queue_results.empty(); - }); - - for (int i = 0; i < (int) queue_results.size(); i++) - { - // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result - if (queue_results[i].multitask_id == task_id) - { - update_multi_task(task_id, queue_results[i].id, queue_results[i]); - queue_results.erase(queue_results.begin() + i); - continue; - } - - if (queue_results[i].id == task_id) - { - assert(queue_results[i].multitask_id == -1); - task_result res = queue_results[i]; - queue_results.erase(queue_results.begin() + i); - return res; - } - } - } - - // never reached - //return task_result{-1, false, false, {}}; - } - // for multiple images processing bool ingest_images(llama_client_slot &slot, int n_batch) { @@ -1331,7 +1283,7 @@ struct llama_server_context } // queue up the multitask so we can track its subtask progression - add_multi_task(multitask_id, subtask_ids); + add_multitask(multitask_id, subtask_ids); return multitask_id; } @@ -1344,8 +1296,8 @@ struct llama_server_context if (slot == nullptr) { // if no slot is available, we defer this task for processing later - // deferred_tasks.push_back(task); - LOG_INFO("no slot", {}); + LOG_TEE("no slot\n"); + queue_tasks.defer(task); break; } @@ -1417,12 +1369,7 @@ struct llama_server_context aggregate_result.error = aggregate_result.error && subres.error; } aggregate_result.result_json = json{ "results", result_jsons }; - - agg_results.push_back(aggregate_result); - - condition_results.notify_all(); - queue_iterator = queue_multitasks.erase(queue_iterator); } else @@ -1432,8 +1379,9 @@ struct llama_server_context } // copy aggregate results of complete multi-tasks to the results queue - std::lock_guard lock_results(mutex_results); - queue_results.insert(queue_results.end(), agg_results.begin(), agg_results.end()); + for (auto& res : agg_results) { + queue_results.send(res); + } } bool update_slots() { @@ -2845,9 +2793,10 @@ int main(int argc, char **argv) } json data = json::parse(req.body); const int task_id = llama.request_completion(data, false, false, -1); + llama.queue_results.add_waiting_task_id(task_id); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); if (!result.error && result.stop) { res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); } @@ -2855,6 +2804,7 @@ int main(int argc, char **argv) { res.status = 404; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); + llama.queue_results.remove_waiting_task_id(task_id); return; } } else { @@ -2862,7 +2812,7 @@ int main(int argc, char **argv) { while (true) { - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); if (!result.error) { const std::string str = "data: " + @@ -2873,6 +2823,7 @@ int main(int argc, char **argv) }); if (!sink.write(str.c_str(), str.size())) { + llama.queue_results.remove_waiting_task_id(task_id); return false; } if (result.stop) { @@ -2888,12 +2839,14 @@ int main(int argc, char **argv) }); if (!sink.write(str.c_str(), str.size())) { + llama.queue_results.remove_waiting_task_id(task_id); return false; } break; } } sink.done(); + llama.queue_results.remove_waiting_task_id(task_id); return true; }; @@ -2901,6 +2854,7 @@ int main(int argc, char **argv) { // cancel llama.request_cancel(task_id); + llama.queue_results.remove_waiting_task_id(task_id); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -2938,10 +2892,11 @@ int main(int argc, char **argv) json data = oaicompat_completion_params_parse(json::parse(req.body)); const int task_id = llama.request_completion(data, false, false, -1); + llama.queue_results.add_waiting_task_id(task_id); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); if (!result.error && result.stop) { json oaicompat_result = format_final_response_oaicompat(data, result); @@ -2952,12 +2907,13 @@ int main(int argc, char **argv) } else { res.status = 500; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); + llama.queue_results.remove_waiting_task_id(task_id); return; } } else { const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink &sink) { while (true) { - task_result llama_result = llama.next_result(task_id); + task_result llama_result = llama.queue_results.recv(task_id); if (!llama_result.error) { std::vector result_array = format_partial_response_oaicompat( llama_result); @@ -2970,6 +2926,7 @@ int main(int argc, char **argv) "\n\n"; LOG_VERBOSE("data stream", {{"to_send", str}}); if (!sink.write(str.c_str(), str.size())) { + llama.queue_results.remove_waiting_task_id(task_id); return false; } } @@ -2985,18 +2942,21 @@ int main(int argc, char **argv) "\n\n"; LOG_VERBOSE("data stream", {{"to_send", str}}); if (!sink.write(str.c_str(), str.size())) { + llama.queue_results.remove_waiting_task_id(task_id); return false; } break; } } sink.done(); + llama.queue_results.remove_waiting_task_id(task_id); return true; }; auto on_complete = [task_id, &llama](bool) { // cancel request llama.request_cancel(task_id); + llama.queue_results.remove_waiting_task_id(task_id); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -3013,7 +2973,7 @@ int main(int argc, char **argv) const int task_id = llama.request_completion(data, true, false, -1); if (!json_value(data, "stream", false)) { std::string completion_text; - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); if (!result.error && result.stop) { res.set_content(result.result_json.dump(-1, ' ', false, json::error_handler_t::replace), "application/json; charset=utf-8"); @@ -3028,7 +2988,7 @@ int main(int argc, char **argv) const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink & sink) { while (true) { - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); if (!result.error) { const std::string str = "data: " + @@ -3128,7 +3088,7 @@ int main(int argc, char **argv) } const int task_id = llama.request_completion({ {"prompt", prompt}, { "n_predict", 0}, {"image_data", image_data} }, false, true, -1); - task_result result = llama.next_result(task_id); + task_result result = llama.queue_results.recv(task_id); return res.set_content(result.result_json.dump(), "application/json; charset=utf-8"); }); @@ -3149,6 +3109,13 @@ int main(int argc, char **argv) llama.queue_tasks.on_all_tasks_finished(std::bind( &llama_server_context::run_on_all_tasks_finished, &llama)); llama.queue_tasks.start_loop(); + llama.queue_results.on_multitask_update(std::bind( + &llama_server_context::update_multitask, + &llama, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3 + )); t.join(); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index cf1925dde8e52..a38698762d6b6 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "json.hpp" @@ -12,6 +13,8 @@ using json = nlohmann::json; +extern bool server_verbose; + #ifndef SERVER_VERBOSE #define SERVER_VERBOSE 1 #endif @@ -160,12 +163,12 @@ struct llama_server_queue { int id = 0; std::mutex mutex_tasks; std::vector queue_tasks; + std::vector queue_tasks_deferred; std::condition_variable condition_tasks; std::function callback_new_task; std::function callback_all_task_finished; int post(T task) { - LOG_INFO("post", {}); std::unique_lock lock(mutex_tasks); task.id = id++; queue_tasks.push_back(std::move(task)); @@ -173,6 +176,11 @@ struct llama_server_queue { return task.id; } + void defer(T task) { + std::unique_lock lock(mutex_tasks); + queue_tasks_deferred.push_back(std::move(task)); + } + int get_next_id() { std::unique_lock lock(mutex_tasks); return id++; @@ -189,7 +197,7 @@ struct llama_server_queue { void start_loop() { while (true) { // new task arrived - LOG_INFO("have new task", {}); + LOG_VERBOSE("have new task", {}); { while (true) { @@ -201,13 +209,27 @@ struct llama_server_queue { task_server task = queue_tasks.front(); queue_tasks.erase(queue_tasks.begin()); lock.unlock(); - LOG_INFO("callback_new_task", {}); + LOG_VERBOSE("callback_new_task", {}); callback_new_task(task); } - LOG_INFO("callback_all_task_finished", {}); + // move deferred tasks back to main loop + { + std::unique_lock lock(mutex_tasks); + //queue_tasks.insert( + // queue_tasks.end(), + // std::make_move_iterator(queue_tasks_deferred.begin()), + // std::make_move_iterator(queue_tasks_deferred.end()) + //); + for (auto & task : queue_tasks_deferred) { + queue_tasks.push_back(task); + } + queue_tasks_deferred.clear(); + lock.unlock(); + } + LOG_VERBOSE("callback_all_task_finished", {}); callback_all_task_finished(); } - LOG_INFO("wait for new task", {}); + LOG_VERBOSE("wait for new task", {}); // wait for new task { std::unique_lock lock(mutex_tasks); @@ -221,6 +243,78 @@ struct llama_server_queue { } }; +struct llama_server_response_event { + typedef std::function callback_multitask_t; + std::vector queue_results; + std::mutex mutex_task_ids; + std::set waiting_task_ids; + std::mutex mutex_results; + std::condition_variable condition_results; + callback_multitask_t callback_update_multitask; + + void add_waiting_task_id(int task_id) { + std::unique_lock lock(mutex_task_ids); + waiting_task_ids.insert(task_id); + } + + void remove_waiting_task_id(int task_id) { + std::unique_lock lock(mutex_task_ids); + waiting_task_ids.erase(task_id); + } + + task_result recv(int task_id) { + while (true) + { + std::unique_lock lock(mutex_results); + condition_results.wait(lock, [&]{ + return !queue_results.empty(); + }); + LOG_VERBOSE("condition_results unblock", {}); + + for (int i = 0; i < (int) queue_results.size(); i++) + { + if (queue_results[i].id == task_id) + { + assert(queue_results[i].multitask_id == -1); + task_result res = queue_results[i]; + queue_results.erase(queue_results.begin() + i); + return res; + } + } + } + + // should never reach here + } + + void on_multitask_update(callback_multitask_t callback) { + callback_update_multitask = callback; + } + + void send(task_result result) { + std::unique_lock lock(mutex_results); + std::unique_lock lock1(mutex_task_ids); + LOG_VERBOSE("send new result", {}); + for (auto& task_id : waiting_task_ids) { + LOG_TEE("waiting task id %i \n", task_id); + // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result + if (result.multitask_id == task_id) + { + LOG_VERBOSE("callback_update_multitask", {}); + callback_update_multitask(task_id, result.id, result); + continue; + } + + if (result.id == task_id) + { + LOG_VERBOSE("queue_results.push_back", {}); + queue_results.push_back(result); + condition_results.notify_one(); + return; + } + } + } +}; + // // base64 utils (TODO: move to common in the future) // From 906afe7810ae049bd596d91739ec0b364f4e5232 Mon Sep 17 00:00:00 2001 From: ngxson Date: Sun, 21 Jan 2024 14:58:26 +0100 Subject: [PATCH 3/7] server: add comments --- examples/server/oai.hpp | 194 +++++++++++++++++++++++++++++ examples/server/server.cpp | 247 +------------------------------------ examples/server/utils.hpp | 80 ++++++++++-- 3 files changed, 266 insertions(+), 255 deletions(-) diff --git a/examples/server/oai.hpp b/examples/server/oai.hpp index b06b7efb42265..bc5db6eef2b55 100644 --- a/examples/server/oai.hpp +++ b/examples/server/oai.hpp @@ -10,5 +10,199 @@ #include "json.hpp" #include "utils.hpp" +#define DEFAULT_OAICOMPAT_MODEL "gpt-3.5-turbo-0613" + using json = nlohmann::json; +inline static json oaicompat_completion_params_parse( + const json &body /* openai api json semantics */) +{ + json llama_params; + + llama_params["__oaicompat"] = true; + + // Map OpenAI parameters to llama.cpp parameters + // + // For parameters that are defined by the OpenAI documentation (e.g. + // temperature), we explicitly specify OpenAI's intended default; we + // need to do that because sometimes OpenAI disagrees with llama.cpp + // + // https://platform.openai.com/docs/api-reference/chat/create + llama_sampling_params default_sparams; + llama_params["model"] = json_value(body, "model", std::string("unknown")); + llama_params["prompt"] = format_chatml(body["messages"]); // OpenAI 'messages' to llama.cpp 'prompt' + llama_params["cache_prompt"] = json_value(body, "cache_prompt", false); + llama_params["temperature"] = json_value(body, "temperature", 0.0); + llama_params["top_k"] = json_value(body, "top_k", default_sparams.top_k); + llama_params["top_p"] = json_value(body, "top_p", 1.0); + llama_params["n_predict"] = json_value(body, "max_tokens", -1); + llama_params["logit_bias"] = json_value(body, "logit_bias",json::object()); + llama_params["frequency_penalty"] = json_value(body, "frequency_penalty", 0.0); + llama_params["presence_penalty"] = json_value(body, "presence_penalty", 0.0); + llama_params["seed"] = json_value(body, "seed", LLAMA_DEFAULT_SEED); + llama_params["stream"] = json_value(body, "stream", false); + llama_params["mirostat"] = json_value(body, "mirostat", default_sparams.mirostat); + llama_params["mirostat_tau"] = json_value(body, "mirostat_tau", default_sparams.mirostat_tau); + llama_params["mirostat_eta"] = json_value(body, "mirostat_eta", default_sparams.mirostat_eta); + llama_params["penalize_nl"] = json_value(body, "penalize_nl", default_sparams.penalize_nl); + llama_params["typical_p"] = json_value(body, "typical_p", default_sparams.typical_p); + llama_params["repeat_last_n"] = json_value(body, "repeat_last_n", default_sparams.penalty_last_n); + llama_params["ignore_eos"] = json_value(body, "ignore_eos", false); + llama_params["tfs_z"] = json_value(body, "tfs_z", default_sparams.tfs_z); + + if (body.count("grammar") != 0) { + llama_params["grammar"] = json_value(body, "grammar", json::object()); + } + + // Handle 'stop' field + if (body.contains("stop") && body["stop"].is_string()) { + llama_params["stop"] = json::array({body["stop"].get()}); + } else { + llama_params["stop"] = json_value(body, "stop", json::array()); + } + + // Ensure there is ChatML-specific end sequence among stop words + llama_params["stop"].push_back("<|im_end|>"); + + return llama_params; +} + +inline static json format_final_response_oaicompat(const json &request, const task_result &response, bool streaming = false) +{ + json result = response.result_json; + + bool stopped_word = result.count("stopped_word") != 0; + bool stopped_eos = json_value(result, "stopped_eos", false); + int num_tokens_predicted = json_value(result, "tokens_predicted", 0); + int num_prompt_tokens = json_value(result, "tokens_evaluated", 0); + std::string content = json_value(result, "content", std::string("")); + + std::string finish_reason = "length"; + if (stopped_word || stopped_eos) { + finish_reason = "stop"; + } + + json choices = + streaming ? json::array({json{{"finish_reason", finish_reason}, + {"index", 0}, + {"delta", json::object()}}}) + : json::array({json{{"finish_reason", finish_reason}, + {"index", 0}, + {"message", json{{"content", content}, + {"role", "assistant"}}}}}); + + std::time_t t = std::time(0); + + json res = + json{{"choices", choices}, + {"created", t}, + {"model", + json_value(request, "model", std::string(DEFAULT_OAICOMPAT_MODEL))}, + {"object", streaming ? "chat.completion.chunk" : "chat.completion"}, + {"usage", + json{{"completion_tokens", num_tokens_predicted}, + {"prompt_tokens", num_prompt_tokens}, + {"total_tokens", num_tokens_predicted + num_prompt_tokens}}}, + {"id", gen_chatcmplid()}}; + + if (server_verbose) { + res["__verbose"] = result; + } + + if (result.contains("completion_probabilities")) { + res["completion_probabilities"] = json_value(result, "completion_probabilities", json::array()); + } + + return res; +} + +// return value is vector as there is one case where we might need to generate two responses +inline static std::vector format_partial_response_oaicompat(const task_result &response) { + json result = response.result_json; + + if (!result.contains("model") || !result.contains("oaicompat_token_ctr")) { + return std::vector({response.result_json}); + } + + bool first = json_value(result, "oaicompat_token_ctr", 0) == 0; + std::string modelname = json_value(result, "model", std::string(DEFAULT_OAICOMPAT_MODEL)); + + bool stopped_word = json_value(result, "stopped_word", false); + bool stopped_eos = json_value(result, "stopped_eos", false); + bool stopped_limit = json_value(result, "stopped_limit", false); + std::string content = json_value(result, "content", std::string("")); + + std::string finish_reason; + if (stopped_word || stopped_eos) { + finish_reason = "stop"; + } + if (stopped_limit) { + finish_reason = "length"; + } + + std::time_t t = std::time(0); + + json choices; + + if (!finish_reason.empty()) { + choices = json::array({json{{"finish_reason", finish_reason}, + {"index", 0}, + {"delta", json::object()}}}); + } else { + if (first) { + if (content.empty()) { + choices = json::array({json{{"finish_reason", nullptr}, + {"index", 0}, + {"delta", json{{"role", "assistant"}}}}}); + } else { + // We have to send this as two updates to conform to openai behavior + json initial_ret = json{{"choices", json::array({json{ + {"finish_reason", nullptr}, + {"index", 0}, + {"delta", json{ + {"role", "assistant"} + }}}})}, + {"created", t}, + {"id", gen_chatcmplid()}, + {"model", modelname}, + {"object", "chat.completion.chunk"}}; + + json second_ret = json{ + {"choices", json::array({json{{"finish_reason", nullptr}, + {"index", 0}, + {"delta", json{ + {"content", content}}} + }})}, + {"created", t}, + {"id", gen_chatcmplid()}, + {"model", modelname}, + {"object", "chat.completion.chunk"}}; + + return std::vector({initial_ret, second_ret}); + } + } else { + // Some idiosyncrasy in task processing logic makes several trailing calls + // with empty content, we ignore these at the calee site. + if (content.empty()) { + return std::vector({json::object()}); + } + + choices = json::array({json{ + {"finish_reason", nullptr}, + {"index", 0}, + {"delta", + json{ + {"content", content}, + }}, + }}); + } + } + + json ret = json{{"choices", choices}, + {"created", t}, + {"id", gen_chatcmplid()}, + {"model", modelname}, + {"object", "chat.completion.chunk"}}; + + return std::vector({ret}); +} diff --git a/examples/server/server.cpp b/examples/server/server.cpp index a853d4a693311..68153928e839b 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -2,6 +2,7 @@ #include "llama.h" #include "grammar-parser.h" #include "utils.hpp" +#include "oai.hpp" #include "../llava/clip.h" @@ -29,8 +30,6 @@ #include #include -#define DEFAULT_OAICOMPAT_MODEL "gpt-3.5-turbo-0613" - using json = nlohmann::json; struct server_params @@ -45,9 +44,6 @@ struct server_params bool server_verbose = false; -json oaicompat_completion_params_parse(const json &body); -std::string format_chatml(std::vector messages); - static size_t common_part(const std::vector &a, const std::vector &b) { size_t i; @@ -143,15 +139,6 @@ static json probs_vector_to_json(const llama_context *ctx, const std::vector -static T json_value(const json &body, const std::string &key, const T &default_value) -{ - // Fallback null to default value - return body.contains(key) && !body.at(key).is_null() - ? body.value(key, default_value) - : default_value; -} - struct llama_client_slot { int id; @@ -2264,239 +2251,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, } } -static std::string random_string() -{ - static const std::string str("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"); - - std::random_device rd; - std::mt19937 generator(rd()); - - std::string result(32, ' '); - - for (int i = 0; i < 32; ++i) { - result[i] = str[generator() % str.size()]; - } - - return result; -} - -static std::string gen_chatcmplid() -{ - std::stringstream chatcmplid; - chatcmplid << "chatcmpl-" << random_string(); - return chatcmplid.str(); -} - -std::string format_chatml(std::vector messages) -{ - std::ostringstream chatml_msgs; - - for (auto it = messages.begin(); it != messages.end(); ++it) { - chatml_msgs << "<|im_start|>" - << json_value(*it, "role", std::string("user")) << '\n'; - chatml_msgs << json_value(*it, "content", std::string("")) - << "<|im_end|>\n"; - } - - chatml_msgs << "<|im_start|>assistant" << '\n'; - - return chatml_msgs.str(); -} - /* llama.cpp completion api semantics */ -json oaicompat_completion_params_parse( - const json &body /* openai api json semantics */) -{ - json llama_params; - - llama_params["__oaicompat"] = true; - - // Map OpenAI parameters to llama.cpp parameters - // - // For parameters that are defined by the OpenAI documentation (e.g. - // temperature), we explicitly specify OpenAI's intended default; we - // need to do that because sometimes OpenAI disagrees with llama.cpp - // - // https://platform.openai.com/docs/api-reference/chat/create - llama_sampling_params default_sparams; - llama_params["model"] = json_value(body, "model", std::string("unknown")); - llama_params["prompt"] = format_chatml(body["messages"]); // OpenAI 'messages' to llama.cpp 'prompt' - llama_params["cache_prompt"] = json_value(body, "cache_prompt", false); - llama_params["temperature"] = json_value(body, "temperature", 0.0); - llama_params["top_k"] = json_value(body, "top_k", default_sparams.top_k); - llama_params["top_p"] = json_value(body, "top_p", 1.0); - llama_params["n_predict"] = json_value(body, "max_tokens", -1); - llama_params["logit_bias"] = json_value(body, "logit_bias",json::object()); - llama_params["frequency_penalty"] = json_value(body, "frequency_penalty", 0.0); - llama_params["presence_penalty"] = json_value(body, "presence_penalty", 0.0); - llama_params["seed"] = json_value(body, "seed", LLAMA_DEFAULT_SEED); - llama_params["stream"] = json_value(body, "stream", false); - llama_params["mirostat"] = json_value(body, "mirostat", default_sparams.mirostat); - llama_params["mirostat_tau"] = json_value(body, "mirostat_tau", default_sparams.mirostat_tau); - llama_params["mirostat_eta"] = json_value(body, "mirostat_eta", default_sparams.mirostat_eta); - llama_params["penalize_nl"] = json_value(body, "penalize_nl", default_sparams.penalize_nl); - llama_params["typical_p"] = json_value(body, "typical_p", default_sparams.typical_p); - llama_params["repeat_last_n"] = json_value(body, "repeat_last_n", default_sparams.penalty_last_n); - llama_params["ignore_eos"] = json_value(body, "ignore_eos", false); - llama_params["tfs_z"] = json_value(body, "tfs_z", default_sparams.tfs_z); - - if (body.count("grammar") != 0) { - llama_params["grammar"] = json_value(body, "grammar", json::object()); - } - - // Handle 'stop' field - if (body.contains("stop") && body["stop"].is_string()) { - llama_params["stop"] = json::array({body["stop"].get()}); - } else { - llama_params["stop"] = json_value(body, "stop", json::array()); - } - - // Ensure there is ChatML-specific end sequence among stop words - llama_params["stop"].push_back("<|im_end|>"); - - return llama_params; -} - -static json format_final_response_oaicompat(const json &request, const task_result &response, bool streaming = false) -{ - json result = response.result_json; - - bool stopped_word = result.count("stopped_word") != 0; - bool stopped_eos = json_value(result, "stopped_eos", false); - int num_tokens_predicted = json_value(result, "tokens_predicted", 0); - int num_prompt_tokens = json_value(result, "tokens_evaluated", 0); - std::string content = json_value(result, "content", std::string("")); - - std::string finish_reason = "length"; - if (stopped_word || stopped_eos) { - finish_reason = "stop"; - } - - json choices = - streaming ? json::array({json{{"finish_reason", finish_reason}, - {"index", 0}, - {"delta", json::object()}}}) - : json::array({json{{"finish_reason", finish_reason}, - {"index", 0}, - {"message", json{{"content", content}, - {"role", "assistant"}}}}}); - - std::time_t t = std::time(0); - - json res = - json{{"choices", choices}, - {"created", t}, - {"model", - json_value(request, "model", std::string(DEFAULT_OAICOMPAT_MODEL))}, - {"object", streaming ? "chat.completion.chunk" : "chat.completion"}, - {"usage", - json{{"completion_tokens", num_tokens_predicted}, - {"prompt_tokens", num_prompt_tokens}, - {"total_tokens", num_tokens_predicted + num_prompt_tokens}}}, - {"id", gen_chatcmplid()}}; - - if (server_verbose) { - res["__verbose"] = result; - } - - if (result.contains("completion_probabilities")) { - res["completion_probabilities"] = json_value(result, "completion_probabilities", json::array()); - } - - return res; -} - -// return value is vector as there is one case where we might need to generate two responses -static std::vector format_partial_response_oaicompat(const task_result &response) { - json result = response.result_json; - - if (!result.contains("model") || !result.contains("oaicompat_token_ctr")) { - return std::vector({response.result_json}); - } - - bool first = json_value(result, "oaicompat_token_ctr", 0) == 0; - std::string modelname = json_value(result, "model", std::string(DEFAULT_OAICOMPAT_MODEL)); - - bool stopped_word = json_value(result, "stopped_word", false); - bool stopped_eos = json_value(result, "stopped_eos", false); - bool stopped_limit = json_value(result, "stopped_limit", false); - std::string content = json_value(result, "content", std::string("")); - - std::string finish_reason; - if (stopped_word || stopped_eos) { - finish_reason = "stop"; - } - if (stopped_limit) { - finish_reason = "length"; - } - - std::time_t t = std::time(0); - - json choices; - - if (!finish_reason.empty()) { - choices = json::array({json{{"finish_reason", finish_reason}, - {"index", 0}, - {"delta", json::object()}}}); - } else { - if (first) { - if (content.empty()) { - choices = json::array({json{{"finish_reason", nullptr}, - {"index", 0}, - {"delta", json{{"role", "assistant"}}}}}); - } else { - // We have to send this as two updates to conform to openai behavior - json initial_ret = json{{"choices", json::array({json{ - {"finish_reason", nullptr}, - {"index", 0}, - {"delta", json{ - {"role", "assistant"} - }}}})}, - {"created", t}, - {"id", gen_chatcmplid()}, - {"model", modelname}, - {"object", "chat.completion.chunk"}}; - - json second_ret = json{ - {"choices", json::array({json{{"finish_reason", nullptr}, - {"index", 0}, - {"delta", json{ - {"content", content}}} - }})}, - {"created", t}, - {"id", gen_chatcmplid()}, - {"model", modelname}, - {"object", "chat.completion.chunk"}}; - - return std::vector({initial_ret, second_ret}); - } - } else { - // Some idiosyncrasy in task processing logic makes several trailing calls - // with empty content, we ignore these at the calee site. - if (content.empty()) { - return std::vector({json::object()}); - } - - choices = json::array({json{ - {"finish_reason", nullptr}, - {"index", 0}, - {"delta", - json{ - {"content", content}, - }}, - }}); - } - } - - json ret = json{{"choices", choices}, - {"created", t}, - {"id", gen_chatcmplid()}, - {"model", modelname}, - {"object", "chat.completion.chunk"}}; - - return std::vector({ret}); -} - static json format_partial_response( llama_server_context &llama, llama_client_slot *slot, const std::string &content, const std::vector &probs ) { diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index a38698762d6b6..b14898d68ccb6 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -154,6 +154,35 @@ static inline void server_log(const char *level, const char *function, int line, fflush(stdout); } +// +// server utils +// + +template +static T json_value(const json &body, const std::string &key, const T &default_value) +{ + // Fallback null to default value + return body.contains(key) && !body.at(key).is_null() + ? body.value(key, default_value) + : default_value; +} + +inline std::string format_chatml(std::vector messages) +{ + std::ostringstream chatml_msgs; + + for (auto it = messages.begin(); it != messages.end(); ++it) { + chatml_msgs << "<|im_start|>" + << json_value(*it, "role", std::string("user")) << '\n'; + chatml_msgs << json_value(*it, "content", std::string("")) + << "<|im_end|>\n"; + } + + chatml_msgs << "<|im_start|>assistant" << '\n'; + + return chatml_msgs.str(); +} + // // work queue utils // @@ -168,6 +197,7 @@ struct llama_server_queue { std::function callback_new_task; std::function callback_all_task_finished; + // Add a new task to the end of the queue int post(T task) { std::unique_lock lock(mutex_tasks); task.id = id++; @@ -176,24 +206,29 @@ struct llama_server_queue { return task.id; } + // Add a new task, but defer until the next loop void defer(T task) { std::unique_lock lock(mutex_tasks); queue_tasks_deferred.push_back(std::move(task)); } + // Get the next task id int get_next_id() { std::unique_lock lock(mutex_tasks); return id++; } + // Register function to process a new task void on_new_task(std::function callback) { callback_new_task = callback; } + // Register the function to be called when the batch of tasks is finished void on_all_tasks_finished(std::function callback) { callback_all_task_finished = callback; } + // Start the main loop. This call is blocking void start_loop() { while (true) { // new task arrived @@ -215,13 +250,8 @@ struct llama_server_queue { // move deferred tasks back to main loop { std::unique_lock lock(mutex_tasks); - //queue_tasks.insert( - // queue_tasks.end(), - // std::make_move_iterator(queue_tasks_deferred.begin()), - // std::make_move_iterator(queue_tasks_deferred.end()) - //); for (auto & task : queue_tasks_deferred) { - queue_tasks.push_back(task); + queue_tasks.push_back(std::move(task)); } queue_tasks_deferred.clear(); lock.unlock(); @@ -245,12 +275,14 @@ struct llama_server_queue { struct llama_server_response_event { typedef std::function callback_multitask_t; - std::vector queue_results; + callback_multitask_t callback_update_multitask; + // for keeping track of all tasks waiting for the result std::mutex mutex_task_ids; std::set waiting_task_ids; + // the main result queue + std::vector queue_results; std::mutex mutex_results; std::condition_variable condition_results; - callback_multitask_t callback_update_multitask; void add_waiting_task_id(int task_id) { std::unique_lock lock(mutex_task_ids); @@ -262,6 +294,7 @@ struct llama_server_response_event { waiting_task_ids.erase(task_id); } + // This function blocks the thread until there is a response for this task_id task_result recv(int task_id) { while (true) { @@ -286,16 +319,18 @@ struct llama_server_response_event { // should never reach here } + // Register the function to update multitask void on_multitask_update(callback_multitask_t callback) { callback_update_multitask = callback; } + // Send a new result to a waiting task_id void send(task_result result) { std::unique_lock lock(mutex_results); std::unique_lock lock1(mutex_task_ids); LOG_VERBOSE("send new result", {}); for (auto& task_id : waiting_task_ids) { - LOG_TEE("waiting task id %i \n", task_id); + // LOG_TEE("waiting task id %i \n", task_id); // for now, tasks that have associated parent multitasks just get erased once multitask picks up the result if (result.multitask_id == task_id) { @@ -387,4 +422,31 @@ static inline std::vector base64_decode(const std::string & encoded_str } return ret; +} + +// +// random string / id +// + +static std::string random_string() +{ + static const std::string str("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"); + + std::random_device rd; + std::mt19937 generator(rd()); + + std::string result(32, ' '); + + for (int i = 0; i < 32; ++i) { + result[i] = str[generator() % str.size()]; + } + + return result; +} + +static std::string gen_chatcmplid() +{ + std::stringstream chatcmplid; + chatcmplid << "chatcmpl-" << random_string(); + return chatcmplid.str(); } \ No newline at end of file From d87b48fd55276f4996caa9149855e82f7acdbff8 Mon Sep 17 00:00:00 2001 From: ngxson Date: Mon, 22 Jan 2024 22:32:06 +0100 Subject: [PATCH 4/7] server: move all mutexes away from server.cpp --- examples/server/server.cpp | 97 ++++++++++---------------------------- examples/server/utils.hpp | 78 +++++++++++++++++++++++++----- 2 files changed, 90 insertions(+), 85 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 68153928e839b..e927b9c0dc1ac 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -328,10 +327,8 @@ struct llama_server_context // slots / clients std::vector slots; - llama_server_queue queue_tasks; - llama_server_response_event queue_results; - std::vector queue_multitasks; - std::mutex mutex_multitasks; + llama_server_queue queue_tasks; + llama_server_response queue_results; ~llama_server_context() { @@ -961,30 +958,6 @@ struct llama_server_context queue_results.send(res); } - void add_multitask(int id, std::vector& sub_ids) - { - std::lock_guard lock(mutex_multitasks); - task_multi multi; - multi.id = id; - std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); - queue_multitasks.push_back(multi); - // TODO @ngxson : Do we need to notify the queue_tasks? - } - - void update_multitask(int multitask_id, int subtask_id, task_result& result) - { - std::lock_guard lock(mutex_multitasks); - for (auto& multitask : queue_multitasks) - { - if (multitask.id == multitask_id) - { - multitask.subtasks_remaining.erase(subtask_id); - multitask.results.push_back(result); - // TODO @ngxson : Do we need to notify the queue_tasks? - } - } - } - json get_model_props() { return get_formated_generation(slots[0]); @@ -1120,7 +1093,7 @@ struct llama_server_context // parent multitask, if any, needs to be updated if (slot.multitask_id != -1) { - update_multitask(slot.multitask_id, slot.task_id, res); + queue_tasks.update_multitask(slot.multitask_id, slot.task_id, res); } } @@ -1157,7 +1130,6 @@ struct llama_server_context int request_completion(json data, bool infill, bool embedding, int multitask_id) { - std::unique_lock lock(mutex_multitasks); task_server task; task.target_id = 0; task.data = std::move(data); @@ -1169,7 +1141,6 @@ struct llama_server_context // when a completion task's prompt array is not a singleton, we split it into multiple requests if (task.data.count("prompt") && task.data.at("prompt").size() > 1) { - lock.unlock(); // entering new func scope return split_multiprompt_task(task); } @@ -1270,11 +1241,11 @@ struct llama_server_context } // queue up the multitask so we can track its subtask progression - add_multitask(multitask_id, subtask_ids); + queue_tasks.add_multitask(multitask_id, subtask_ids); return multitask_id; } - void process_single_task(task_server task) + void process_single_task(task_server& task) { switch (task.type) { @@ -1283,7 +1254,7 @@ struct llama_server_context if (slot == nullptr) { // if no slot is available, we defer this task for processing later - LOG_TEE("no slot\n"); + LOG_VERBOSE("no slot is available", {}); queue_tasks.defer(task); break; } @@ -1333,42 +1304,23 @@ struct llama_server_context } } - void process_multitask() + void on_finish_multitask(task_multi& multitask) { - // remove finished multitasks from the queue of multitasks, and add the corresponding result to the result queue - std::vector agg_results; - auto queue_iterator = queue_multitasks.begin(); - while (queue_iterator != queue_multitasks.end()) - { - if (queue_iterator->subtasks_remaining.empty()) - { - // all subtasks done == multitask is done - task_result aggregate_result; - aggregate_result.id = queue_iterator->id; - aggregate_result.stop = true; - aggregate_result.error = false; - - // collect json results into one json result - std::vector result_jsons; - for (auto& subres : queue_iterator->results) - { - result_jsons.push_back(subres.result_json); - aggregate_result.error = aggregate_result.error && subres.error; - } - aggregate_result.result_json = json{ "results", result_jsons }; - agg_results.push_back(aggregate_result); - queue_iterator = queue_multitasks.erase(queue_iterator); - } - else - { - ++queue_iterator; - } - } + // all subtasks done == multitask is done + task_result result; + result.id = multitask.id; + result.stop = true; + result.error = false; - // copy aggregate results of complete multi-tasks to the results queue - for (auto& res : agg_results) { - queue_results.send(res); + // collect json results into one json result + std::vector result_jsons; + for (auto& subres : multitask.results) + { + result_jsons.push_back(subres.result_json); + result.error = result.error && subres.error; } + result.result_json = json{ "results", result_jsons }; + queue_results.send(result); } bool update_slots() { @@ -1704,7 +1656,6 @@ struct llama_server_context } void run_on_all_tasks_finished() { - process_multitask(); update_slots(); } }; @@ -2861,16 +2812,18 @@ int main(int argc, char **argv) llama.queue_tasks.on_new_task(std::bind( &llama_server_context::process_single_task, &llama, std::placeholders::_1)); + llama.queue_tasks.on_finish_multitask(std::bind( + &llama_server_context::on_finish_multitask, &llama, std::placeholders::_1)); llama.queue_tasks.on_all_tasks_finished(std::bind( &llama_server_context::run_on_all_tasks_finished, &llama)); - llama.queue_tasks.start_loop(); llama.queue_results.on_multitask_update(std::bind( - &llama_server_context::update_multitask, - &llama, + &llama_server_queue::update_multitask, + &llama.queue_tasks, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3 )); + llama.queue_tasks.start_loop(); t.join(); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index b14898d68ccb6..80f57dc597fb9 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -187,18 +187,21 @@ inline std::string format_chatml(std::vector messages) // work queue utils // -template struct llama_server_queue { int id = 0; std::mutex mutex_tasks; - std::vector queue_tasks; - std::vector queue_tasks_deferred; + // queues + std::vector queue_tasks; + std::vector queue_tasks_deferred; + std::vector queue_multitasks; std::condition_variable condition_tasks; - std::function callback_new_task; + // callback functions + std::function callback_new_task; + std::function callback_finish_multitask; std::function callback_all_task_finished; // Add a new task to the end of the queue - int post(T task) { + int post(task_server task) { std::unique_lock lock(mutex_tasks); task.id = id++; queue_tasks.push_back(std::move(task)); @@ -207,7 +210,7 @@ struct llama_server_queue { } // Add a new task, but defer until the next loop - void defer(T task) { + void defer(task_server task) { std::unique_lock lock(mutex_tasks); queue_tasks_deferred.push_back(std::move(task)); } @@ -219,10 +222,15 @@ struct llama_server_queue { } // Register function to process a new task - void on_new_task(std::function callback) { + void on_new_task(std::function callback) { callback_new_task = callback; } + // Register function to process a multitask + void on_finish_multitask(std::function callback) { + callback_finish_multitask = callback; + } + // Register the function to be called when the batch of tasks is finished void on_all_tasks_finished(std::function callback) { callback_all_task_finished = callback; @@ -257,6 +265,24 @@ struct llama_server_queue { lock.unlock(); } LOG_VERBOSE("callback_all_task_finished", {}); + // process and update all the multitasks + auto queue_iterator = queue_multitasks.begin(); + while (queue_iterator != queue_multitasks.end()) + { + if (queue_iterator->subtasks_remaining.empty()) + { + // all subtasks done == multitask is done + task_multi current_multitask = *queue_iterator; + callback_finish_multitask(current_multitask); + // remove this multitask + queue_iterator = queue_multitasks.erase(queue_iterator); + } + else + { + ++queue_iterator; + } + } + // all tasks in the current loop is finished callback_all_task_finished(); } LOG_VERBOSE("wait for new task", {}); @@ -271,13 +297,40 @@ struct llama_server_queue { } } } + + // + // functions to manage multitasks + // + + // add a multitask by specifying the id of all subtask (subtask is a task_server) + void add_multitask(int multitask_id, std::vector& sub_ids) + { + std::lock_guard lock(mutex_tasks); + task_multi multi; + multi.id = multitask_id; + std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end())); + queue_multitasks.push_back(multi); + } + + // updatethe remaining subtasks, while appending results to multitask + void update_multitask(int multitask_id, int subtask_id, task_result& result) + { + std::lock_guard lock(mutex_tasks); + for (auto& multitask : queue_multitasks) + { + if (multitask.id == multitask_id) + { + multitask.subtasks_remaining.erase(subtask_id); + multitask.results.push_back(result); + } + } + } }; -struct llama_server_response_event { +struct llama_server_response { typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; // for keeping track of all tasks waiting for the result - std::mutex mutex_task_ids; std::set waiting_task_ids; // the main result queue std::vector queue_results; @@ -285,12 +338,12 @@ struct llama_server_response_event { std::condition_variable condition_results; void add_waiting_task_id(int task_id) { - std::unique_lock lock(mutex_task_ids); + std::unique_lock lock(mutex_results); waiting_task_ids.insert(task_id); } void remove_waiting_task_id(int task_id) { - std::unique_lock lock(mutex_task_ids); + std::unique_lock lock(mutex_results); waiting_task_ids.erase(task_id); } @@ -327,7 +380,6 @@ struct llama_server_response_event { // Send a new result to a waiting task_id void send(task_result result) { std::unique_lock lock(mutex_results); - std::unique_lock lock1(mutex_task_ids); LOG_VERBOSE("send new result", {}); for (auto& task_id : waiting_task_ids) { // LOG_TEE("waiting task id %i \n", task_id); @@ -449,4 +501,4 @@ static std::string gen_chatcmplid() std::stringstream chatcmplid; chatcmplid << "chatcmpl-" << random_string(); return chatcmplid.str(); -} \ No newline at end of file +} From 1bd867894d648325e5e976382c95f2af2bad7688 Mon Sep 17 00:00:00 2001 From: ngxson Date: Mon, 22 Jan 2024 22:42:59 +0100 Subject: [PATCH 5/7] server: correct multitask response --- examples/server/server.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index e927b9c0dc1ac..7916147be89e0 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1089,12 +1089,6 @@ struct llama_server_context } queue_results.send(res); - - // parent multitask, if any, needs to be updated - if (slot.multitask_id != -1) - { - queue_tasks.update_multitask(slot.multitask_id, slot.task_id, res); - } } void send_embedding(llama_client_slot &slot) @@ -1319,7 +1313,7 @@ struct llama_server_context result_jsons.push_back(subres.result_json); result.error = result.error && subres.error; } - result.result_json = json{ "results", result_jsons }; + result.result_json = json{ { "results", result_jsons } }; queue_results.send(result); } From d083c81761d0f2e3e5be9d7552885767b7ec73eb Mon Sep 17 00:00:00 2001 From: ngxson Date: Mon, 22 Jan 2024 23:06:12 +0100 Subject: [PATCH 6/7] server: only add back deferred tasks when one slot is available --- examples/server/server.cpp | 1 + examples/server/utils.hpp | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 7916147be89e0..54dac86910b11 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1383,6 +1383,7 @@ struct llama_server_context slot.t_last_used = ggml_time_us(); LOG_TEE("slot %d released (%d tokens in cache)\n", slot.id, (int) slot.cache_tokens.size()); + queue_tasks.notify_slot_changed(); continue; } diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index 80f57dc597fb9..3ea697f59e5b5 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -209,7 +209,7 @@ struct llama_server_queue { return task.id; } - // Add a new task, but defer until the next loop + // Add a new task, but defer until one slot is available void defer(task_server task) { std::unique_lock lock(mutex_tasks); queue_tasks_deferred.push_back(std::move(task)); @@ -236,6 +236,16 @@ struct llama_server_queue { callback_all_task_finished = callback; } + // Call when the state of one slot is changed + void notify_slot_changed() { + // move deferred tasks back to main loop + std::unique_lock lock(mutex_tasks); + for (auto & task : queue_tasks_deferred) { + queue_tasks.push_back(std::move(task)); + } + queue_tasks_deferred.clear(); + } + // Start the main loop. This call is blocking void start_loop() { while (true) { @@ -255,15 +265,6 @@ struct llama_server_queue { LOG_VERBOSE("callback_new_task", {}); callback_new_task(task); } - // move deferred tasks back to main loop - { - std::unique_lock lock(mutex_tasks); - for (auto & task : queue_tasks_deferred) { - queue_tasks.push_back(std::move(task)); - } - queue_tasks_deferred.clear(); - lock.unlock(); - } LOG_VERBOSE("callback_all_task_finished", {}); // process and update all the multitasks auto queue_iterator = queue_multitasks.begin(); From 8f36df8fc9c82eb2b56257217957a09509170e87 Mon Sep 17 00:00:00 2001 From: ngxson Date: Tue, 23 Jan 2024 18:13:38 +0100 Subject: [PATCH 7/7] server: fix a race condition cause by "request_completion" --- examples/server/server.cpp | 60 +++++++++++++++++++++++++------------- examples/server/utils.hpp | 8 +++-- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 54dac86910b11..39283613256ed 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1122,9 +1122,10 @@ struct llama_server_context queue_results.send(res); } - int request_completion(json data, bool infill, bool embedding, int multitask_id) + void request_completion(int task_id, json data, bool infill, bool embedding, int multitask_id) { task_server task; + task.id = task_id; task.target_id = 0; task.data = std::move(data); task.infill_mode = infill; @@ -1135,11 +1136,11 @@ struct llama_server_context // when a completion task's prompt array is not a singleton, we split it into multiple requests if (task.data.count("prompt") && task.data.at("prompt").size() > 1) { - return split_multiprompt_task(task); + split_multiprompt_task(task_id, task); } // otherwise, it's a single-prompt task, we actually queue it - return queue_tasks.post(task); + queue_tasks.post(task); } // for multiple images processing @@ -1218,25 +1219,30 @@ struct llama_server_context queue_tasks.post(task); } - int split_multiprompt_task(task_server& multiprompt_task) + void split_multiprompt_task(int multitask_id, task_server& multiprompt_task) { int prompt_count = multiprompt_task.data.at("prompt").size(); assert(prompt_count > 1); - int multitask_id = queue_tasks.get_next_id(); + // generate all the ID for subtask std::vector subtask_ids(prompt_count); for (int i = 0; i < prompt_count; i++) + { + subtask_ids[i] = queue_tasks.get_new_id(); + } + + // queue up the multitask so we can track its subtask progression + queue_tasks.add_multitask(multitask_id, subtask_ids); + + // add subtasks + for (int i = 0; i < prompt_count; i++) { json subtask_data = multiprompt_task.data; subtask_data["prompt"] = subtask_data["prompt"][i]; // subtasks inherit everything else (infill mode, embedding mode, etc.) - subtask_ids[i] = request_completion(subtask_data, multiprompt_task.infill_mode, multiprompt_task.embedding_mode, multitask_id); + request_completion(subtask_ids[i], subtask_data, multiprompt_task.infill_mode, multiprompt_task.embedding_mode, multitask_id); } - - // queue up the multitask so we can track its subtask progression - queue_tasks.add_multitask(multitask_id, subtask_ids); - return multitask_id; } void process_single_task(task_server& task) @@ -2493,8 +2499,9 @@ int main(int argc, char **argv) return; } json data = json::parse(req.body); - const int task_id = llama.request_completion(data, false, false, -1); + const int task_id = llama.queue_tasks.get_new_id(); llama.queue_results.add_waiting_task_id(task_id); + llama.request_completion(task_id, data, false, false, -1); if (!json_value(data, "stream", false)) { std::string completion_text; task_result result = llama.queue_results.recv(task_id); @@ -2505,9 +2512,8 @@ int main(int argc, char **argv) { res.status = 404; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); - llama.queue_results.remove_waiting_task_id(task_id); - return; } + llama.queue_results.remove_waiting_task_id(task_id); } else { const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink & sink) { @@ -2546,8 +2552,9 @@ int main(int argc, char **argv) break; } } - sink.done(); + llama.queue_results.remove_waiting_task_id(task_id); + sink.done(); return true; }; @@ -2592,8 +2599,9 @@ int main(int argc, char **argv) } json data = oaicompat_completion_params_parse(json::parse(req.body)); - const int task_id = llama.request_completion(data, false, false, -1); + const int task_id = llama.queue_tasks.get_new_id(); llama.queue_results.add_waiting_task_id(task_id); + llama.request_completion(task_id, data, false, false, -1); if (!json_value(data, "stream", false)) { std::string completion_text; @@ -2608,9 +2616,8 @@ int main(int argc, char **argv) } else { res.status = 500; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); - llama.queue_results.remove_waiting_task_id(task_id); - return; } + llama.queue_results.remove_waiting_task_id(task_id); } else { const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink &sink) { while (true) { @@ -2671,7 +2678,9 @@ int main(int argc, char **argv) return; } json data = json::parse(req.body); - const int task_id = llama.request_completion(data, true, false, -1); + const int task_id = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(task_id); + llama.request_completion(task_id, data, true, false, -1); if (!json_value(data, "stream", false)) { std::string completion_text; task_result result = llama.queue_results.recv(task_id); @@ -2683,8 +2692,8 @@ int main(int argc, char **argv) { res.status = 404; res.set_content(result.result_json["content"], "text/plain; charset=utf-8"); - return; } + llama.queue_results.remove_waiting_task_id(task_id); } else { const auto chunked_content_provider = [task_id, &llama](size_t, httplib::DataSink & sink) { while (true) @@ -2700,6 +2709,7 @@ int main(int argc, char **argv) }); if (!sink.write(str.c_str(), str.size())) { + llama.queue_results.remove_waiting_task_id(task_id); return false; } if (result.stop) @@ -2713,8 +2723,8 @@ int main(int argc, char **argv) } } + llama.queue_results.remove_waiting_task_id(task_id); sink.done(); - return true; }; @@ -2788,8 +2798,16 @@ int main(int argc, char **argv) image_data = ""; } - const int task_id = llama.request_completion({ {"prompt", prompt}, { "n_predict", 0}, {"image_data", image_data} }, false, true, -1); + // create and queue the task + const int task_id = llama.queue_tasks.get_new_id(); + llama.queue_results.add_waiting_task_id(task_id); + llama.request_completion(task_id, { {"prompt", prompt}, { "n_predict", 0}, {"image_data", image_data} }, false, true, -1); + + // get the result task_result result = llama.queue_results.recv(task_id); + llama.queue_results.remove_waiting_task_id(task_id); + + // send the result return res.set_content(result.result_json.dump(), "application/json; charset=utf-8"); }); diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index 3ea697f59e5b5..e2b6065f734a2 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -203,7 +203,9 @@ struct llama_server_queue { // Add a new task to the end of the queue int post(task_server task) { std::unique_lock lock(mutex_tasks); - task.id = id++; + if (task.id == -1) { + task.id = id++; + } queue_tasks.push_back(std::move(task)); condition_tasks.notify_one(); return task.id; @@ -215,8 +217,8 @@ struct llama_server_queue { queue_tasks_deferred.push_back(std::move(task)); } - // Get the next task id - int get_next_id() { + // Get the next id for creating anew task + int get_new_id() { std::unique_lock lock(mutex_tasks); return id++; }