From f8866855a798d1eca52020e18f09f08c08f5def5 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Wed, 8 Oct 2025 11:04:23 -0300 Subject: [PATCH 1/3] add support for systemd services and sockets --- common/arg.cpp | 8 +++ common/common.h | 2 + tools/server/server.cpp | 111 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 112 insertions(+), 9 deletions(-) diff --git a/common/arg.cpp b/common/arg.cpp index ecc296485cb47..281a4dc97a030 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -3363,8 +3363,16 @@ common_params_context common_params_parser_init(common_params & params, llama_ex [](common_params & params, int value) { params.timeout_read = value; params.timeout_write = value; + params.timeout_idle = value; } ).set_examples({LLAMA_EXAMPLE_SERVER}).set_env("LLAMA_ARG_TIMEOUT")); + add_opt(common_arg( + {"--systemd"}, + string_format("enable systemd integration (default: %s)", params.systemd ? "enabled" : "disabled"), + [](common_params & params) { + params.systemd = true; + } + ).set_examples({LLAMA_EXAMPLE_SERVER}).set_env("LLAMA_ARG_SYSTEMD")); add_opt(common_arg( {"--threads-http"}, "N", string_format("number of threads used to process HTTP requests (default: %d)", params.n_threads_http), diff --git a/common/common.h b/common/common.h index 8a8ecd667f2cc..61a8a8325021e 100644 --- a/common/common.h +++ b/common/common.h @@ -423,6 +423,7 @@ struct common_params { int32_t port = 8080; // server listens on this network port int32_t timeout_read = 600; // http read timeout in seconds int32_t timeout_write = timeout_read; // http write timeout in seconds + int32_t timeout_idle = timeout_read; // http idle timeout in seconds int32_t n_threads_http = -1; // number of threads to process HTTP requests (TODO: support threadpool) int32_t n_cache_reuse = 0; // min chunk size to reuse from the cache via KV shifting int32_t n_ctx_checkpoints = 3; // max number of context checkpoints per slot @@ -449,6 +450,7 @@ struct common_params { bool endpoint_slots = true; bool endpoint_props = false; // only control POST requests, not GET bool endpoint_metrics = false; + bool systemd = false; bool log_json = false; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index de6e1a322b2c2..7bbcb54af4fca 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -4094,6 +4094,37 @@ inline void signal_handler(int signal) { shutdown_handler(signal); } +/** + * Signal to systemd that this service is ready + */ +void notify_ready(); +/** + * Signal to systemd that this service is still alive + */ +void notify_beat(); +/** + * Modified http server + */ +class StdinHttpServer: public httplib::Server { + + public: + /** + * Set the socket without open it. + */ + inline void set_socket(const socket_t host) { + this->svr_sock_ = host; + } + /** + * Close the socket without shutdown + * https://www.freedesktop.org/software/systemd/man/latest/systemd.socket.html#Accept= + */ + inline void terminate() { + int sock = this->svr_sock_; + set_socket(INVALID_SOCKET); + close(sock); + } +}; + int main(int argc, char ** argv) { // own arguments required by this example common_params params; @@ -4115,7 +4146,10 @@ int main(int argc, char ** argv) { LOG_INF("%s\n", common_params_get_system_info(params).c_str()); LOG_INF("\n"); - std::unique_ptr svr; + /** + * Use the hacked http server + */ + std::unique_ptr svr; #ifdef CPPHTTPLIB_OPENSSL_SUPPORT if (params.ssl_file_key != "" && params.ssl_file_cert != "") { LOG_INF("Running with SSL: key = %s, cert = %s\n", params.ssl_file_key.c_str(), params.ssl_file_cert.c_str()); @@ -4124,14 +4158,20 @@ int main(int argc, char ** argv) { ); } else { LOG_INF("Running without SSL\n"); - svr.reset(new httplib::Server()); + /** + * Create the hacked http server + */ + svr.reset(new StdinHttpServer()); } #else if (params.ssl_file_key != "" && params.ssl_file_cert != "") { LOG_ERR("Server is built without SSL support\n"); return 1; } - svr.reset(new httplib::Server()); + /** + * Create the hacked http server + */ + svr.reset(new StdinHttpServer()); #endif std::atomic state{SERVER_STATE_LOADING_MODEL}; @@ -4179,6 +4219,10 @@ int main(int argc, char ** argv) { // set timeouts and change hostname and port svr->set_read_timeout (params.timeout_read); svr->set_write_timeout(params.timeout_write); + /** + * Wake up if no packet. Should be equal to WATCHDOG timeout + */ + svr->set_idle_interval(params.timeout_idle); std::unordered_map log_data; @@ -4256,8 +4300,14 @@ int main(int argc, char ** argv) { }; // register server middlewares - svr->set_pre_routing_handler([&middleware_validate_api_key, &middleware_server_state](const httplib::Request & req, httplib::Response & res) { + svr->set_pre_routing_handler([¶ms, &middleware_validate_api_key, &middleware_server_state](const httplib::Request & req, httplib::Response & res) { res.set_header("Access-Control-Allow-Origin", req.get_header_value("Origin")); + /** + * Notify heartbeat for systemd watchdog + */ + if (params.systemd) { + notify_beat(); + } // If this is OPTIONS request, skip validation because browsers don't include Authorization header if (req.method == "OPTIONS") { res.set_header("Access-Control-Allow-Credentials", "true"); @@ -5303,7 +5353,12 @@ int main(int argc, char ** argv) { bool was_bound = false; bool is_sock = false; - if (string_ends_with(std::string(params.hostname), ".sock")) { + if (params.systemd) { + is_sock = true; + was_bound = true; + LOG_INF("%s: using socket from systemd\n", __func__); + svr->set_socket(3); + } else if (string_ends_with(std::string(params.hostname), ".sock")) { is_sock = true; LOG_INF("%s: setting address family to AF_UNIX\n", __func__); svr->set_address_family(AF_UNIX); @@ -5329,9 +5384,6 @@ int main(int argc, char ** argv) { return 1; } - // run the HTTP server in a thread - std::thread t([&]() { svr->listen_after_bind(); }); - svr->wait_until_ready(); LOG_INF("%s: HTTP server is listening, hostname: %s, port: %d, http threads: %d\n", __func__, params.hostname.c_str(), params.port, params.n_threads_http); @@ -5340,7 +5392,6 @@ int main(int argc, char ** argv) { if (!ctx_server.load_model(params)) { clean_up(); - t.join(); LOG_ERR("%s: exiting due to model loading error\n", __func__); return 1; } @@ -5348,6 +5399,13 @@ int main(int argc, char ** argv) { ctx_server.init(); state.store(SERVER_STATE_READY); + /** + * Listen after loading the model so we don't loose the first connection. + */ + // run the HTTP server in a thread + std::thread t([&]() { svr->listen_after_bind(); }); + svr->wait_until_ready(); + LOG_INF("%s: model loaded\n", __func__); // print sample chat example to make it clear which template is used @@ -5386,6 +5444,10 @@ int main(int argc, char ** argv) { is_sock ? string_format("unix://%s", params.hostname.c_str()).c_str() : string_format("http://%s:%d", params.hostname.c_str(), params.port).c_str()); + if (params.systemd) { + notify_ready(); + } + // this call blocks the main thread until queue_tasks.terminate() is called ctx_server.queue_tasks.start_loop(); @@ -5394,3 +5456,34 @@ int main(int argc, char ** argv) { return 0; } + + +void notify_beat() { + /* Based on the systemd configuration the NOTIFY_SOCKET will + be opened and with id 4 + */ + int notify_fd = 4; + + std::string message = "WATCHDOG=1"; + if (write(notify_fd, message.c_str(), message.length()) == -1) { + perror("write"); + close(notify_fd); + return; + } +} + +void notify_ready() { + /* Based on the systemd configuration the NOTIFY_SOCKET will + be opened and with id 4 + */ + int notify_fd = 4; + + std::string message = "READY=1"; + if (write(notify_fd, message.c_str(), message.length()) == -1) { + perror("write"); + close(notify_fd); + return; + } + +} + From 4fb511c9631aa4d6e7e754cf0e081c4973f1bdf2 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Wed, 8 Oct 2025 14:54:01 -0300 Subject: [PATCH 2/3] added n_stream to allow client to specify the number of token to hold before sending a stream package --- tools/server/server.cpp | 49 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 7bbcb54af4fca..e9e174eeac1d8 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -120,6 +120,7 @@ struct slot_params { int32_t n_discard = 0; // number of tokens after n_keep that may be discarded when shifting context, 0 defaults to half int32_t n_predict = -1; // new tokens to predict int32_t n_indent = 0; // minimum line indentation for the generated text in number of whitespace characters + int32_t n_stream = 0; // number of tokens hold before stream int64_t t_max_prompt_ms = -1; // TODO: implement int64_t t_max_predict_ms = -1; // if positive, limit the generation phase to this time limit @@ -321,6 +322,8 @@ struct server_task { params.n_indent = json_value(data, "n_indent", defaults.n_indent); params.n_keep = json_value(data, "n_keep", defaults.n_keep); params.n_discard = json_value(data, "n_discard", defaults.n_discard); + params.n_stream = json_value(data, "n_stream", defaults.n_stream); + //params.t_max_prompt_ms = json_value(data, "t_max_prompt_ms", defaults.t_max_prompt_ms); // TODO: implement params.t_max_predict_ms = json_value(data, "t_max_predict_ms", defaults.t_max_predict_ms); params.response_fields = json_value(data, "response_fields", std::vector()); @@ -1459,6 +1462,7 @@ struct server_slot { server_tokens cache_tokens; std::vector generated_token_probs; + std::vector pending_partial; std::vector ctx_checkpoints; @@ -2565,7 +2569,9 @@ struct server_context { slot.add_token(result); if (slot.params.stream) { - send_partial_response(slot, result, false); + // FIXME: only N tokens + // send_partial_response(slot, result, false); + inc_partial_response(slot, result); } } @@ -2757,6 +2763,44 @@ struct server_context { return true; } + void inc_partial_response(server_slot & slot, const completion_token_output & tkn) { + auto res = std::make_unique(); + + slot.pending_partial.push_back(tkn); + slot.update_chat_msg(res->oaicompat_msg_diffs); + + flush_partial_response(slot, false); + } + + void flush_partial_response(server_slot & slot, bool force) { + auto res = std::make_unique(); + + res->id = slot.id_task; + res->index = slot.index; + + res->n_decoded = slot.n_decoded; + res->n_prompt_tokens = slot.n_prompt_tokens; + res->post_sampling_probs = slot.params.post_sampling_probs; + + res->verbose = slot.params.verbose; + res->oaicompat = slot.params.oaicompat; + res->oaicompat_model = slot.params.oaicompat_model; + res->oaicompat_cmpl_id = slot.params.oaicompat_cmpl_id; + + if (slot.stop != STOP_TYPE_NONE || slot.params.timings_per_token) { + res->timings = slot.get_timings(); + } + + if (slot.pending_partial.size() >= slot.params.n_stream || force) { + for (auto & part : slot.pending_partial) { + res->content += part.text_to_send; + res->tokens.push_back(part.tok); + } + queue_results.send(std::move(res)); + slot.pending_partial.clear(); + } + } + void send_partial_response(server_slot & slot, const completion_token_output & tkn, bool is_progress) { auto res = std::make_unique(); @@ -2799,6 +2843,9 @@ struct server_context { } void send_final_response(server_slot & slot) { + if (slot.params.stream) { + flush_partial_response(slot, true); + } auto res = std::make_unique(); res->id = slot.id_task; res->id_slot = slot.id; From 4e099dc18c32d79262b19a7abb454eab9b2a9406 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 10 Oct 2025 11:12:03 -0300 Subject: [PATCH 3/3] fix stop use the proper function to terminate which doesnt shutdown the socket also trap sigabrt from watchdog --- build.sh | 27 +++++++++++++++++++++++++++ tools/server/server.cpp | 3 ++- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 build.sh diff --git a/build.sh b/build.sh new file mode 100644 index 0000000000000..26785d9324851 --- /dev/null +++ b/build.sh @@ -0,0 +1,27 @@ +# 2006 wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64/cuda-keyring_1.1-1_all.deb +# 2007 sudo dpkg -i cuda-keyring_1.1-1_all.deb +# 2008 sudo apt-get update +# 2009 sudo apt-get -y install cuda-toolkit-13-0 + +export PATH=$PATH:/usr/local/cuda/bin +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda/lib64:/usr/local/lib +export CPLUS_INCLUDE_PATH=/usr/local/cuda/include + +#To override the native GPU detection: + +#1. Take note of the Compute Capability of your NVIDIA devices: "CUDA: Your GPU Compute > Capability". +#GeForce RTX 4090 8.9 +#GeForce RTX 3080 Ti 8.6 +#GeForce RTX 3070 8.6 +#2. Manually list each varying Compute Capability in the CMAKE_CUDA_ARCHITECTURES list. +#cmake -B build -DGGML_CUDA=ON -DCMAKE_CUDA_ARCHITECTURES="86;89" + +echo PREPARE + +#cmake -B build -DGGML_CPU_ALL_VARIANTS=ON -DBUILD_SHARED_LIBS=ON -DGGML_BACKEND_DL=ON -DGGML_CUDA=ON -DCMAKE_CUDA_ARCHITECTURES="89" -DCMAKE_BUILD_TYPE=Debug +cmake -B build -DBUILD_SHARED_LIBS=OFF -DGGML_CUDA=ON -DCMAKE_BUILD_TYPE=Debug -DCMAKE_CUDA_ARCHITECTURES=89 + +echo BUILDING + +cmake --build build --config Debug --target llama-server + diff --git a/tools/server/server.cpp b/tools/server/server.cpp index e9e174eeac1d8..9f6ffe2eda468 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5393,7 +5393,7 @@ int main(int argc, char ** argv) { // clean up function, to be called before exit auto clean_up = [&svr, &ctx_server]() { SRV_INF("%s: cleaning up before exit...\n", __func__); - svr->stop(); + svr->terminate(); ctx_server.queue_results.terminate(); llama_backend_free(); }; @@ -5479,6 +5479,7 @@ int main(int argc, char ** argv) { sigemptyset (&sigint_action.sa_mask); sigint_action.sa_flags = 0; sigaction(SIGINT, &sigint_action, NULL); + sigaction(SIGABRT, &sigint_action, NULL); sigaction(SIGTERM, &sigint_action, NULL); #elif defined (_WIN32) auto console_ctrl_handler = +[](DWORD ctrl_type) -> BOOL {