Skip to content
Merged
Next Next commit
draft
  • Loading branch information
Vishwesh Bankwar committed Oct 10, 2023
commit 9db7130b6e371f1a058cb7a13edb12be82b39495
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
// </copyright>

using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using Google.Protobuf;
using OpenTelemetry.Internal;
Expand All @@ -28,13 +29,33 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;

internal sealed class OtlpLogRecordTransformer
{
private static readonly ConcurrentBag<OtlpLogs.ScopeLogs> LogListPool = new();

private readonly SdkLimitOptions sdkLimitOptions;
private readonly ExperimentalOptions experimentalOptions;
private readonly Dictionary<string, OtlpLogs.ScopeLogs> logsByCategory;

public OtlpLogRecordTransformer(SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions)
{
this.sdkLimitOptions = sdkLimitOptions;
this.experimentalOptions = experimentalOptions;
this.logsByCategory = new Dictionary<string, OtlpLogs.ScopeLogs>();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Return(OtlpCollector.ExportLogsServiceRequest request)
{
var resourceLogs = request.ResourceLogs.FirstOrDefault();
if (resourceLogs == null)
{
return;
}

foreach (var scope in resourceLogs.ScopeLogs)
{
scope.LogRecords.Clear();
LogListPool.Add(scope);
}
}

internal OtlpCollector.ExportLogsServiceRequest BuildExportRequest(
Expand All @@ -49,21 +70,53 @@ internal OtlpCollector.ExportLogsServiceRequest BuildExportRequest(
};
request.ResourceLogs.Add(resourceLogs);

var scopeLogs = new OtlpLogs.ScopeLogs();
resourceLogs.ScopeLogs.Add(scopeLogs);
this.logsByCategory.Clear();

foreach (var logRecord in logRecordBatch)
{
var otlpLogRecord = this.ToOtlpLog(logRecord);
if (otlpLogRecord != null)
{
scopeLogs.LogRecords.Add(otlpLogRecord);
if (!this.logsByCategory.TryGetValue(logRecord.CategoryName, out var scopeLogs))
{
scopeLogs = this.GetLogListFromPool(logRecord.CategoryName);
scopeLogs.LogRecords.Add(otlpLogRecord);
this.logsByCategory.Add(logRecord.CategoryName, scopeLogs);
resourceLogs.ScopeLogs.Add(scopeLogs);
}
else
{
scopeLogs.LogRecords.Add(otlpLogRecord);
}
}
}

return request;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal OtlpLogs.ScopeLogs GetLogListFromPool(string name)
{
if (!LogListPool.TryTake(out var logs))
{
logs = new OtlpLogs.ScopeLogs
{
Scope = new OtlpCommon.InstrumentationScope
{
Name = name, // Name is enforced to not be null, but it can be empty.
Version = string.Empty, // NRE throw by proto
},
};
}
else
{
logs.Scope.Name = name;
logs.Scope.Version = string.Empty;
}

return logs;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal OtlpLogs.LogRecord ToOtlpLog(LogRecord logRecord)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

var request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

try
{
var request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request))
{
return ExportResult.Failure;
Expand All @@ -104,6 +104,10 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
finally
{
this.otlpLogRecordTransformer.Return(request);
}

return ExportResult.Success;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
using OpenTelemetry.Resources;
using OpenTelemetry.Tests;
using OpenTelemetry.Trace;
using Xunit;
Expand Down Expand Up @@ -1242,6 +1243,45 @@ public void AddOtlpLogExporterLogRecordProcessorOptionsTest(ExportProcessorType
}
}

[Fact]
public void ValidateInstrumentationScope()
{
var logRecords = new List<LogRecord>();
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddOpenTelemetry(options => options
.AddInMemoryExporter(logRecords));
});

var logger1 = loggerFactory.CreateLogger("OtlpLogExporterTests-A");
logger1.LogInformation("Hello from {name} {price}.", "red-tomato", 2.99);

var logger2 = loggerFactory.CreateLogger("OtlpLogExporterTests-B");
logger2.LogInformation("Hello from {name} {price}.", "green-tomato", 2.99);

Assert.Equal(2, logRecords.Count);

var batch = new Batch<LogRecord>(logRecords.ToArray(), logRecords.Count);
var logRecordTransformer = new OtlpLogRecordTransformer(new(), new());

var resourceBuilder = ResourceBuilder.CreateEmpty();
var processResource = resourceBuilder.Build().ToOtlpResource();

var request = logRecordTransformer.BuildExportRequest(processResource, batch);

Assert.Single(request.ResourceLogs);

Assert.Equal("OtlpLogExporterTests-A", request.ResourceLogs[0].ScopeLogs.First().Scope.Name);
Assert.Equal("OtlpLogExporterTests-B", request.ResourceLogs[0].ScopeLogs.Last().Scope.Name);

//Assert.Single(resourceMetric.ScopeMetrics);
//var instrumentationLibraryMetrics = resourceMetric.ScopeMetrics.First();
//Assert.Equal(string.Empty, instrumentationLibraryMetrics.SchemaUrl);
//Assert.Equal(meter.Name, instrumentationLibraryMetrics.Scope.Name);
//Assert.Equal("0.0.1", instrumentationLibraryMetrics.Scope.Version);
}

private static OtlpCommon.KeyValue TryGetAttribute(OtlpLogs.LogRecord record, string key)
{
return record.Attributes.FirstOrDefault(att => att.Key == key);
Expand Down