Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ FLB_EXPORT double flb_time_now();

/* start stop the engine */
FLB_EXPORT int flb_start(flb_ctx_t *ctx);
FLB_EXPORT int flb_start_trace(flb_ctx_t *ctx);
FLB_EXPORT int flb_stop(flb_ctx_t *ctx);
FLB_EXPORT int flb_loop(flb_ctx_t *ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/flb_chunk_trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input,
ctx->input = (void *)input;
ctx->trace_prefix = flb_sds_create(trace_prefix);

flb_start(ctx->flb);
flb_start_trace(ctx->flb);

in->chunk_trace_ctxt = ctx;
pthread_mutex_unlock(&in->chunk_trace_lock);
Expand Down
24 changes: 21 additions & 3 deletions src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ double flb_time_now()
return flb_time_to_double(&t);
}

/* Start the engine */
int flb_start(flb_ctx_t *ctx)
int static do_start(flb_ctx_t *ctx)
{
int fd;
int bytes;
Expand All @@ -669,7 +668,6 @@ int flb_start(flb_ctx_t *ctx)
flb_debug("[lib] context set: %p", ctx);

/* set context as the last active one */
flb_context_set(ctx);

/* spawn worker thread */
config = ctx->config;
Expand Down Expand Up @@ -715,6 +713,26 @@ int flb_start(flb_ctx_t *ctx)
return 0;
}

/* Start the engine */
int flb_start(flb_ctx_t *ctx)
{
int ret;

ret = do_start(ctx);
if (ret == 0) {
/* set context as the last active one */
flb_context_set(ctx);
}

return ret;
}

/* Start the engine without setting the global context */
int flb_start_trace(flb_ctx_t *ctx)
{
return do_start(ctx);
}

int flb_loop(flb_ctx_t *ctx)
{
while (ctx->status == FLB_LIB_OK) {
Expand Down
231 changes: 231 additions & 0 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE;
struct flb_stacktrace flb_st;
#endif

#ifdef FLB_HAVE_CHUNK_TRACE

#include <fluent-bit/flb_chunk_trace.h>

#define FLB_LONG_TRACE (1024 + 1)
#define FLB_LONG_TRACE_INPUT (1024 + 2)
#define FLB_LONG_TRACE_OUTPUT (1024 + 3)
#define FLB_LONG_TRACE_OUTPUT_PROPERTY (1024 + 4)

#endif

#define FLB_HELP_TEXT 0
#define FLB_HELP_JSON 1

Expand Down Expand Up @@ -138,6 +149,10 @@ static void flb_help(int rc, struct flb_config *config)
#endif
#ifdef FLB_HAVE_CHUNK_TRACE
print_opt("-Z, --enable-chunk-trace", "enable chunk tracing. activating it requires using the HTTP Server API.");
print_opt("--trace-input", "input to start tracing on startup.");
print_opt("--trace-output", "output to use for tracing on startup.");
print_opt("--trace-output-property", "set a property for output tracing on startup.");
print_opt("--trace", "setup a trace pipeline on startup. Uses a single line, ie: \"input=dummy.0 output=stdout output.format='json'\"");
#endif
print_opt("-w, --workdir", "set the working directory");
#ifdef FLB_HAVE_HTTP_SERVER
Expand Down Expand Up @@ -726,6 +741,173 @@ static struct flb_cf *service_configure(struct flb_cf *cf,
return cf;
}

#ifdef FLB_HAVE_CHUNK_TRACE
static struct flb_input_instance *find_input(flb_ctx_t *ctx, const char *name)
{
struct mk_list *head;
struct flb_input_instance *in;


mk_list_foreach(head, &ctx->config->inputs) {
in = mk_list_entry(head, struct flb_input_instance, _head);
if (strcmp(name, in->name) == 0) {
return in;
}
if (in->alias) {
if (strcmp(name, in->alias) == 0) {
return in;
}
}
}
return NULL;
}

static int enable_trace_input(flb_ctx_t *ctx, const char *name, const char *prefix, const char *output_name, struct mk_list *props)
{
struct flb_input_instance *in;


in = find_input(ctx, name);
if (in == NULL) {
return FLB_ERROR;
}

flb_chunk_trace_context_new(in, output_name, prefix, NULL, props);
return (in->chunk_trace_ctxt == NULL ? FLB_ERROR : FLB_OK);
}

static int disable_trace_input(flb_ctx_t *ctx, const char *name)
{
struct flb_input_instance *in;


in = find_input(ctx, name);
if (in == NULL) {
return FLB_ERROR;
}

if (in->chunk_trace_ctxt != NULL) {
flb_chunk_trace_context_destroy(in);
}
return FLB_OK;
}

static int set_trace_property(struct mk_list *props, char *kv)
{
int len;
int sep;
char *key;
char *value;

len = strlen(kv);
sep = mk_string_char_search(kv, '=', len);
if (sep == -1) {
return -1;
}

key = mk_string_copy_substr(kv, 0, sep);
value = kv + sep + 1;

if (!key) {
return -1;
}

flb_kv_item_create_len(props,
(char *)key, strlen(key),
(char *)value, strlen(value));

mk_mem_free(key);
return 0;
}

static int parse_trace_pipeline_prop(flb_ctx_t *ctx, const char *kv, char **key, char **value)
{
int len;
int sep;

len = strlen(kv);
sep = mk_string_char_search(kv, '=', len);
if (sep == -1) {
return FLB_ERROR;
}

*key = mk_string_copy_substr(kv, 0, sep);
if (!key) {
return FLB_ERROR;
}

*value = flb_strdup(kv + sep + 1);
return FLB_OK;
}

static int parse_trace_pipeline(flb_ctx_t *ctx, const char *pipeline, char **trace_input, char **trace_output, struct mk_list **props)
{
struct mk_list *parts = NULL;
struct mk_list *cur;
struct flb_split_entry *part;
char *key;
char *value;
const char *propname;
const char *propval;


parts = flb_utils_split(pipeline, (int)' ', 0);
if (parts == NULL) {
return FLB_ERROR;
}

mk_list_foreach(cur, parts) {
key = NULL;
value = NULL;
part = mk_list_entry(cur, struct flb_split_entry, _head);
if (parse_trace_pipeline_prop(ctx, part->value, &key, &value) == FLB_ERROR) {
return FLB_ERROR;
}
if (strcmp(key, "input") == 0) {
if (*trace_input != NULL) {
flb_free(*trace_input);
}
*trace_input = flb_strdup(value);
}
else if (strcmp(key, "output") == 0) {
if (*trace_output != NULL) {
flb_free(*trace_output);
}
*trace_output = flb_strdup(value);
}
else if (strncmp(key, "output.", strlen("output.")) == 0) {
propname = mk_string_copy_substr(key, strlen("output."), strlen(key));
if (propname == NULL) {
return FLB_ERROR;
}

propval = flb_strdup(value);
if (propval == NULL) {
return FLB_ERROR;
}

if (*props == NULL) {
*props = flb_calloc(1, sizeof(struct mk_list));
flb_kv_init(*props);
}

flb_kv_item_create_len(*props,
(char *)propname, strlen(propname),
(char *)propval, strlen(propval));
}
if (key != NULL) {
mk_mem_free(key);
}
if (value != NULL) {
flb_free(value);
}
}

flb_utils_split_free(parts);
return FLB_OK;
}
#endif

int flb_main(int argc, char **argv)
{
int opt;
Expand Down Expand Up @@ -762,6 +944,12 @@ int flb_main(int argc, char **argv)
flb_stacktrace_init(argv[0], &flb_st);
#endif

#ifdef FLB_HAVE_CHUNK_TRACE
char *trace_input = NULL;
char *trace_output = flb_strdup("stdout");
struct mk_list *trace_props = NULL;
#endif

/* Setup long-options */
static const struct option long_opts[] = {
{ "storage_path", required_argument, NULL, 'b' },
Expand Down Expand Up @@ -804,6 +992,10 @@ int flb_main(int argc, char **argv)
{ "enable-hot-reload", no_argument, NULL, 'Y' },
#ifdef FLB_HAVE_CHUNK_TRACE
{ "enable-chunk-trace", no_argument, NULL, 'Z' },
{ "trace", required_argument, NULL, FLB_LONG_TRACE },
{ "trace-input", required_argument, NULL, FLB_LONG_TRACE_INPUT },
{ "trace-output", required_argument, NULL, FLB_LONG_TRACE_OUTPUT },
{ "trace-output-property", required_argument, NULL, FLB_LONG_TRACE_OUTPUT_PROPERTY },
#endif
{ "disable-thread-safety-on-hot-reload", no_argument, NULL, 'W' },
{ NULL, 0, NULL, 0 }
Expand Down Expand Up @@ -998,6 +1190,28 @@ int flb_main(int argc, char **argv)
case 'Z':
flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_ENABLE_CHUNK_TRACE, 0, "on", 0);
break;
case FLB_LONG_TRACE:
parse_trace_pipeline(ctx, optarg, &trace_input, &trace_output, &trace_props);
break;
case FLB_LONG_TRACE_INPUT:
if (trace_input != NULL) {
flb_free(trace_input);
}
trace_input = flb_strdup(optarg);
break;
case FLB_LONG_TRACE_OUTPUT:
if (trace_output != NULL) {
flb_free(trace_output);
}
trace_output = flb_strdup(optarg);
break;
case FLB_LONG_TRACE_OUTPUT_PROPERTY:
if (trace_props == NULL) {
trace_props = flb_calloc(1, sizeof(struct mk_list));
flb_kv_init(trace_props);
}
set_trace_property(trace_props, optarg);
break;
#endif /* FLB_HAVE_CHUNK_TRACE */
default:
flb_help(EXIT_FAILURE, config);
Expand Down Expand Up @@ -1113,6 +1327,12 @@ int flb_main(int argc, char **argv)
*/
ctx = flb_context_get();

#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props);
}
#endif

while (ctx->status == FLB_LIB_OK && exit_signal == 0) {
sleep(1);

Expand All @@ -1130,6 +1350,17 @@ int flb_main(int argc, char **argv)
if (cf_opts != NULL) {
flb_cf_destroy(cf_opts);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
disable_trace_input(ctx, trace_input);
flb_free(trace_input);
}
if (trace_output) {
flb_free(trace_output);
}
#endif

flb_stop(ctx);
flb_destroy(ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/http_server/api/v1/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <msgpack.h>


struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
{
struct mk_list *head;
struct flb_input_instance *in;
Expand Down