diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index eb32ad6258e..b5383dace3f 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -72,6 +72,7 @@ FLB_EXPORT double flb_time_now(); /* start stop the engine */ FLB_EXPORT int flb_start(flb_ctx_t *ctx); +FLB_EXPORT int flb_start_trace(flb_ctx_t *ctx); FLB_EXPORT int flb_stop(flb_ctx_t *ctx); FLB_EXPORT int flb_loop(flb_ctx_t *ctx); diff --git a/src/flb_chunk_trace.c b/src/flb_chunk_trace.c index f3198964ebb..adacb73f21a 100644 --- a/src/flb_chunk_trace.c +++ b/src/flb_chunk_trace.c @@ -230,7 +230,7 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, ctx->input = (void *)input; ctx->trace_prefix = flb_sds_create(trace_prefix); - flb_start(ctx->flb); + flb_start_trace(ctx->flb); in->chunk_trace_ctxt = ctx; pthread_mutex_unlock(&in->chunk_trace_lock); diff --git a/src/flb_lib.c b/src/flb_lib.c index 7ac336d7529..882faa54757 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -653,8 +653,7 @@ double flb_time_now() return flb_time_to_double(&t); } -/* Start the engine */ -int flb_start(flb_ctx_t *ctx) +int static do_start(flb_ctx_t *ctx) { int fd; int bytes; @@ -669,7 +668,6 @@ int flb_start(flb_ctx_t *ctx) flb_debug("[lib] context set: %p", ctx); /* set context as the last active one */ - flb_context_set(ctx); /* spawn worker thread */ config = ctx->config; @@ -715,6 +713,26 @@ int flb_start(flb_ctx_t *ctx) return 0; } +/* Start the engine */ +int flb_start(flb_ctx_t *ctx) +{ + int ret; + + ret = do_start(ctx); + if (ret == 0) { + /* set context as the last active one */ + flb_context_set(ctx); + } + + return ret; +} + +/* Start the engine without setting the global context */ +int flb_start_trace(flb_ctx_t *ctx) +{ + return do_start(ctx); +} + int flb_loop(flb_ctx_t *ctx) { while (ctx->status == FLB_LIB_OK) { diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 46e159b2159..12fc8d09de9 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -78,6 +78,17 @@ volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE; struct flb_stacktrace flb_st; #endif +#ifdef FLB_HAVE_CHUNK_TRACE + +#include + +#define FLB_LONG_TRACE (1024 + 1) +#define FLB_LONG_TRACE_INPUT (1024 + 2) +#define FLB_LONG_TRACE_OUTPUT (1024 + 3) +#define FLB_LONG_TRACE_OUTPUT_PROPERTY (1024 + 4) + +#endif + #define FLB_HELP_TEXT 0 #define FLB_HELP_JSON 1 @@ -138,6 +149,10 @@ static void flb_help(int rc, struct flb_config *config) #endif #ifdef FLB_HAVE_CHUNK_TRACE print_opt("-Z, --enable-chunk-trace", "enable chunk tracing. activating it requires using the HTTP Server API."); + print_opt("--trace-input", "input to start tracing on startup."); + print_opt("--trace-output", "output to use for tracing on startup."); + print_opt("--trace-output-property", "set a property for output tracing on startup."); + print_opt("--trace", "setup a trace pipeline on startup. Uses a single line, ie: \"input=dummy.0 output=stdout output.format='json'\""); #endif print_opt("-w, --workdir", "set the working directory"); #ifdef FLB_HAVE_HTTP_SERVER @@ -726,6 +741,173 @@ static struct flb_cf *service_configure(struct flb_cf *cf, return cf; } +#ifdef FLB_HAVE_CHUNK_TRACE +static struct flb_input_instance *find_input(flb_ctx_t *ctx, const char *name) +{ + struct mk_list *head; + struct flb_input_instance *in; + + + mk_list_foreach(head, &ctx->config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + if (strcmp(name, in->name) == 0) { + return in; + } + if (in->alias) { + if (strcmp(name, in->alias) == 0) { + return in; + } + } + } + return NULL; +} + +static int enable_trace_input(flb_ctx_t *ctx, const char *name, const char *prefix, const char *output_name, struct mk_list *props) +{ + struct flb_input_instance *in; + + + in = find_input(ctx, name); + if (in == NULL) { + return FLB_ERROR; + } + + flb_chunk_trace_context_new(in, output_name, prefix, NULL, props); + return (in->chunk_trace_ctxt == NULL ? FLB_ERROR : FLB_OK); +} + +static int disable_trace_input(flb_ctx_t *ctx, const char *name) +{ + struct flb_input_instance *in; + + + in = find_input(ctx, name); + if (in == NULL) { + return FLB_ERROR; + } + + if (in->chunk_trace_ctxt != NULL) { + flb_chunk_trace_context_destroy(in); + } + return FLB_OK; +} + +static int set_trace_property(struct mk_list *props, char *kv) +{ + int len; + int sep; + char *key; + char *value; + + len = strlen(kv); + sep = mk_string_char_search(kv, '=', len); + if (sep == -1) { + return -1; + } + + key = mk_string_copy_substr(kv, 0, sep); + value = kv + sep + 1; + + if (!key) { + return -1; + } + + flb_kv_item_create_len(props, + (char *)key, strlen(key), + (char *)value, strlen(value)); + + mk_mem_free(key); + return 0; +} + +static int parse_trace_pipeline_prop(flb_ctx_t *ctx, const char *kv, char **key, char **value) +{ + int len; + int sep; + + len = strlen(kv); + sep = mk_string_char_search(kv, '=', len); + if (sep == -1) { + return FLB_ERROR; + } + + *key = mk_string_copy_substr(kv, 0, sep); + if (!key) { + return FLB_ERROR; + } + + *value = flb_strdup(kv + sep + 1); + return FLB_OK; +} + +static int parse_trace_pipeline(flb_ctx_t *ctx, const char *pipeline, char **trace_input, char **trace_output, struct mk_list **props) +{ + struct mk_list *parts = NULL; + struct mk_list *cur; + struct flb_split_entry *part; + char *key; + char *value; + const char *propname; + const char *propval; + + + parts = flb_utils_split(pipeline, (int)' ', 0); + if (parts == NULL) { + return FLB_ERROR; + } + + mk_list_foreach(cur, parts) { + key = NULL; + value = NULL; + part = mk_list_entry(cur, struct flb_split_entry, _head); + if (parse_trace_pipeline_prop(ctx, part->value, &key, &value) == FLB_ERROR) { + return FLB_ERROR; + } + if (strcmp(key, "input") == 0) { + if (*trace_input != NULL) { + flb_free(*trace_input); + } + *trace_input = flb_strdup(value); + } + else if (strcmp(key, "output") == 0) { + if (*trace_output != NULL) { + flb_free(*trace_output); + } + *trace_output = flb_strdup(value); + } + else if (strncmp(key, "output.", strlen("output.")) == 0) { + propname = mk_string_copy_substr(key, strlen("output."), strlen(key)); + if (propname == NULL) { + return FLB_ERROR; + } + + propval = flb_strdup(value); + if (propval == NULL) { + return FLB_ERROR; + } + + if (*props == NULL) { + *props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(*props); + } + + flb_kv_item_create_len(*props, + (char *)propname, strlen(propname), + (char *)propval, strlen(propval)); + } + if (key != NULL) { + mk_mem_free(key); + } + if (value != NULL) { + flb_free(value); + } + } + + flb_utils_split_free(parts); + return FLB_OK; +} +#endif + int flb_main(int argc, char **argv) { int opt; @@ -762,6 +944,12 @@ int flb_main(int argc, char **argv) flb_stacktrace_init(argv[0], &flb_st); #endif +#ifdef FLB_HAVE_CHUNK_TRACE + char *trace_input = NULL; + char *trace_output = flb_strdup("stdout"); + struct mk_list *trace_props = NULL; +#endif + /* Setup long-options */ static const struct option long_opts[] = { { "storage_path", required_argument, NULL, 'b' }, @@ -804,6 +992,10 @@ int flb_main(int argc, char **argv) { "enable-hot-reload", no_argument, NULL, 'Y' }, #ifdef FLB_HAVE_CHUNK_TRACE { "enable-chunk-trace", no_argument, NULL, 'Z' }, + { "trace", required_argument, NULL, FLB_LONG_TRACE }, + { "trace-input", required_argument, NULL, FLB_LONG_TRACE_INPUT }, + { "trace-output", required_argument, NULL, FLB_LONG_TRACE_OUTPUT }, + { "trace-output-property", required_argument, NULL, FLB_LONG_TRACE_OUTPUT_PROPERTY }, #endif { "disable-thread-safety-on-hot-reload", no_argument, NULL, 'W' }, { NULL, 0, NULL, 0 } @@ -998,6 +1190,28 @@ int flb_main(int argc, char **argv) case 'Z': flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_ENABLE_CHUNK_TRACE, 0, "on", 0); break; + case FLB_LONG_TRACE: + parse_trace_pipeline(ctx, optarg, &trace_input, &trace_output, &trace_props); + break; + case FLB_LONG_TRACE_INPUT: + if (trace_input != NULL) { + flb_free(trace_input); + } + trace_input = flb_strdup(optarg); + break; + case FLB_LONG_TRACE_OUTPUT: + if (trace_output != NULL) { + flb_free(trace_output); + } + trace_output = flb_strdup(optarg); + break; + case FLB_LONG_TRACE_OUTPUT_PROPERTY: + if (trace_props == NULL) { + trace_props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(trace_props); + } + set_trace_property(trace_props, optarg); + break; #endif /* FLB_HAVE_CHUNK_TRACE */ default: flb_help(EXIT_FAILURE, config); @@ -1113,6 +1327,12 @@ int flb_main(int argc, char **argv) */ ctx = flb_context_get(); +#ifdef FLB_HAVE_CHUNK_TRACE + if (trace_input != NULL) { + enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props); + } +#endif + while (ctx->status == FLB_LIB_OK && exit_signal == 0) { sleep(1); @@ -1130,6 +1350,17 @@ int flb_main(int argc, char **argv) if (cf_opts != NULL) { flb_cf_destroy(cf_opts); } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (trace_input != NULL) { + disable_trace_input(ctx, trace_input); + flb_free(trace_input); + } + if (trace_output) { + flb_free(trace_output); + } +#endif + flb_stop(ctx); flb_destroy(ctx); diff --git a/src/http_server/api/v1/trace.c b/src/http_server/api/v1/trace.c index 1101b06d9b4..95da1734383 100644 --- a/src/http_server/api/v1/trace.c +++ b/src/http_server/api/v1/trace.c @@ -31,7 +31,7 @@ #include -struct flb_input_instance *find_input(struct flb_hs *hs, const char *name) +static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name) { struct mk_list *head; struct flb_input_instance *in;