Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// <copyright file="ZipkinActivityConversionExtensions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal static class ZipkinActivityConversionExtensions
{
private const long TicksPerMicrosecond = TimeSpan.TicksPerMillisecond / 1000;

private static readonly Dictionary<string, int> RemoteEndpointServiceNameKeyResolutionDictionary = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase)
{
[SpanAttributeConstants.PeerServiceKey] = 0, // RemoteEndpoint.ServiceName primary.
["net.peer.name"] = 1, // RemoteEndpoint.ServiceName first alternative.
["peer.hostname"] = 2, // RemoteEndpoint.ServiceName second alternative.
["peer.address"] = 2, // RemoteEndpoint.ServiceName second alternative.
["http.host"] = 3, // RemoteEndpoint.ServiceName for Http.
["db.instance"] = 4, // RemoteEndpoint.ServiceName for Redis.
};

private static readonly string InvalidSpanId = default(ActivitySpanId).ToHexString();

private static readonly ConcurrentDictionary<string, ZipkinEndpoint> LocalEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> RemoteEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();

private static readonly DictionaryEnumerator<string, string, AttributeEnumerationState>.ForEachDelegate ProcessTagsRef = ProcessTags;
private static readonly ListEnumerator<ActivityEvent, PooledList<ZipkinAnnotation>>.ForEachDelegate ProcessActivityEventsRef = ProcessActivityEvents;

internal static ZipkinSpan ToZipkinSpan(this Activity activity, ZipkinEndpoint defaultLocalEndpoint, bool useShortTraceIds = false)
{
var context = activity.Context;
var startTimestamp = activity.StartTimeUtc.ToEpochMicroseconds();

string parentId = EncodeSpanId(activity.ParentSpanId);
if (string.Equals(parentId, InvalidSpanId, StringComparison.Ordinal))
{
parentId = null;
}

var attributeEnumerationState = new AttributeEnumerationState
{
Tags = PooledList<KeyValuePair<string, string>>.Create(),
};

DictionaryEnumerator<string, string, AttributeEnumerationState>.AllocationFreeForEach(activity.Tags, ref attributeEnumerationState, ProcessTagsRef);

var activitySource = activity.Source;
if (!string.IsNullOrEmpty(activitySource.Name))
{
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>("library.name", activitySource.Name));
if (!string.IsNullOrEmpty(activitySource.Version))
{
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>("library.version", activitySource.Version));
}
}

var localEndpoint = defaultLocalEndpoint;

var serviceName = attributeEnumerationState.ServiceName;

// override default service name
if (!string.IsNullOrWhiteSpace(serviceName))
{
if (!string.IsNullOrWhiteSpace(attributeEnumerationState.ServiceNamespace))
{
serviceName = attributeEnumerationState.ServiceNamespace + "." + serviceName;
}

if (!LocalEndpointCache.TryGetValue(serviceName, out localEndpoint))
{
localEndpoint = defaultLocalEndpoint.Clone(serviceName);
LocalEndpointCache.TryAdd(serviceName, localEndpoint);
}
}

ZipkinEndpoint remoteEndpoint = null;
if ((activity.Kind == ActivityKind.Client || activity.Kind == ActivityKind.Producer) && attributeEnumerationState.RemoteEndpointServiceName != null)
{
remoteEndpoint = RemoteEndpointCache.GetOrAdd(attributeEnumerationState.RemoteEndpointServiceName, ZipkinEndpoint.Create);
}

var annotations = PooledList<ZipkinAnnotation>.Create();
ListEnumerator<ActivityEvent, PooledList<ZipkinAnnotation>>.AllocationFreeForEach(activity.Events, ref annotations, ProcessActivityEventsRef);

return new ZipkinSpan(
EncodeTraceId(context.TraceId, useShortTraceIds),
parentId,
EncodeSpanId(context.SpanId),
ToActivityKind(activity),
activity.OperationName,
activity.StartTimeUtc.ToEpochMicroseconds(),
duration: (long)activity.Duration.ToEpochMicroseconds(),
localEndpoint,
remoteEndpoint,
annotations,
attributeEnumerationState.Tags,
null,
null);
}

internal static string EncodeSpanId(ActivitySpanId spanId)
{
return spanId.ToHexString();
}

internal static long ToEpochMicroseconds(this DateTimeOffset dateTimeOffset)
{
return dateTimeOffset.Ticks / TicksPerMicrosecond;
}

internal static long ToEpochMicroseconds(this TimeSpan timeSpan)
{
return timeSpan.Ticks / TicksPerMicrosecond;
}

internal static long ToEpochMicroseconds(this DateTime utcDateTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cijothomas @CodeBlanch I think this might be a good function to move to a shared file (and use links to share across multiple projects)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyang Sounds good to me. I think @eddynaka is adding the file on #732 so we'll just need to link & use in Zipkin.

{
const long UnixEpochTicks = 621355968000000000L; // = DateTimeOffset.FromUnixTimeMilliseconds(0).Ticks
const long UnixEpochMicroseconds = UnixEpochTicks / TicksPerMicrosecond;

// Truncate sub-microsecond precision before offsetting by the Unix Epoch to avoid
// the last digit being off by one for dates that result in negative Unix times
long microseconds = utcDateTime.Ticks / TicksPerMicrosecond;
return microseconds - UnixEpochMicroseconds;
}

private static string EncodeTraceId(ActivityTraceId traceId, bool useShortTraceIds)
{
var id = traceId.ToHexString();

if (id.Length > 16 && useShortTraceIds)
{
id = id.Substring(id.Length - 16, 16);
}

return id;
}

private static string ToActivityKind(Activity activity)
{
switch (activity.Kind)
{
case ActivityKind.Server:
return "SERVER";
case ActivityKind.Producer:
return "PRODUCER";
case ActivityKind.Consumer:
return "CONSUMER";
case ActivityKind.Client:
return "CLIENT";
}

return null;
}

private static bool ProcessActivityEvents(ref PooledList<ZipkinAnnotation> annotations, ActivityEvent @event)
{
PooledList<ZipkinAnnotation>.Add(ref annotations, new ZipkinAnnotation(@event.Timestamp.ToEpochMicroseconds(), @event.Name));
return true;
}

private static bool ProcessTags(ref AttributeEnumerationState state, KeyValuePair<string, string> attribute)
{
string key = attribute.Key;
string strVal = attribute.Value;

if (strVal != null)
{
if (RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (state.RemoteEndpointServiceName == null || priority < state.RemoteEndpointServiceNamePriority))
{
state.RemoteEndpointServiceName = strVal;
state.RemoteEndpointServiceNamePriority = priority;
}
else if (key == Resource.ServiceNameKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, wonder if "if-else" or a dict/hash look up would be more performant. Probably won't matter as it is not on the hot path.

{
state.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey)
{
state.ServiceNamespace = strVal;
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));
}
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));
}

return true;
}

private struct AttributeEnumerationState
{
public PooledList<KeyValuePair<string, string>> Tags;

public string RemoteEndpointServiceName;

public int RemoteEndpointServiceNamePriority;

public string ServiceName;

public string ServiceNamespace;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ public void Write(Utf8JsonWriter writer)

writer.WriteString("id", this.Id);

writer.WriteString("kind", this.Kind);
if (this.Kind != null)
{
writer.WriteString("kind", this.Kind);
}

if (this.Timestamp.HasValue)
{
Expand Down
28 changes: 28 additions & 0 deletions src/OpenTelemetry.Exporter.Zipkin/TracerBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,33 @@ public static TracerBuilder UseZipkin(this TracerBuilder builder, Action<ZipkinT
processorConfigure.Invoke(b);
});
}

/// <summary>
/// Registers a Zipkin exporter that will receive <see cref="System.Diagnostics.Activity"/> instances.
/// </summary>
/// <param name="builder"><see cref="OpenTelemetryBuilder"/> builder to use.</param>
/// <param name="configure">Exporter configuration options.</param>
/// <returns>The instance of <see cref="OpenTelemetryBuilder"/> to chain the calls.</returns>
public static OpenTelemetryBuilder UseZipkinActivityExporter(this OpenTelemetryBuilder builder, Action<ZipkinTraceExporterOptions> configure)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

return builder.AddProcessorPipeline(pipeline =>
{
var options = new ZipkinTraceExporterOptions();
configure(options);

var activityExporter = new ZipkinActivityExporter(options);
pipeline.SetExporter(activityExporter);
});
}
}
}
Loading