Skip to content
Next Next commit
init universal instrumentation
  • Loading branch information
DylanLovesCoffee committed Oct 5, 2022
commit adc5a9dc6eb83e7889b899f46e949458f407c1e4
106 changes: 93 additions & 13 deletions internal/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,29 @@
package extension

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"

"github.com/DataDog/datadog-lambda-go/internal/logger"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
)

type ddTraceContext string

const (
DdTraceId ddTraceContext = "x-datadog-trace-id"
DdParentId ddTraceContext = "x-datadog-parent-id"
DdSpanId ddTraceContext = "x-datadog-span-id"
DdSamplingPriority ddTraceContext = "x-datadog-sampling-priority"
DdInvocationError ddTraceContext = "x-datadog-invocation-error"

DdSeverlessSpan ddTraceContext = "dd-tracer-serverless-span"
DdLambdaResponse ddTraceContext = "dd-response"
)

const (
Expand All @@ -23,8 +40,10 @@ const (
// want to let it having some time for its cold start so we should not set this too low.
timeout = 3000 * time.Millisecond

helloUrl = "http://localhost:8124/lambda/hello"
flushUrl = "http://localhost:8124/lambda/flush"
helloUrl = "http://localhost:8124/lambda/hello"
flushUrl = "http://localhost:8124/lambda/flush"
startInvocationUrl = "http://localhost:8124/lambda/start-invocation"
endInvocationUrl = "http://localhost:8124/lambda/end-invocation"

extensionPath = "/opt/extensions/datadog-agent"
)
Expand All @@ -33,6 +52,8 @@ type ExtensionManager struct {
helloRoute string
flushRoute string
extensionPath string
startInvocationUrl string
endInvocationUrl string
httpClient HTTPClient
isExtensionRunning bool
}
Expand All @@ -43,10 +64,12 @@ type HTTPClient interface {

func BuildExtensionManager() *ExtensionManager {
em := &ExtensionManager{
helloRoute: helloUrl,
flushRoute: flushUrl,
extensionPath: extensionPath,
httpClient: &http.Client{Timeout: timeout},
helloRoute: helloUrl,
flushRoute: flushUrl,
startInvocationUrl: startInvocationUrl,
endInvocationUrl: endInvocationUrl,
extensionPath: extensionPath,
httpClient: &http.Client{Timeout: timeout},
}
em.checkAgentRunning()
return em
Expand All @@ -57,14 +80,71 @@ func (em *ExtensionManager) checkAgentRunning() {
logger.Debug("Will use the API")
em.isExtensionRunning = false
} else {
req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil)
if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
logger.Debug("Will use the Serverless Agent")
em.isExtensionRunning = true
} else {
logger.Debug("Will use the API since the Serverless Agent was detected but the hello route was unreachable")
em.isExtensionRunning = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if you think we should still make this call to /hello in the case where universal instrumentation is not enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha yup, good catch. I made a note to add this back but totally forgot 👍 side note: /hello is a confusing route name for its actual usage in the extension 😢

logger.Debug("Will use the Serverless Agent")
em.isExtensionRunning = true
}
}

func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, eventPayload json.RawMessage) context.Context {
body := bytes.NewBuffer(eventPayload)
req, _ := http.NewRequest(http.MethodPost, em.startInvocationUrl, body)

if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
// Propagate dd-trace context from the extension response if found in the response headers
traceId := response.Header.Values(string(DdTraceId))
if len(traceId) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(traceId) > 0 {
if traceId != "" {

Nit pick. In case you decide to use response.Header.Get, this is more performant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, I actually looked this up. It turns out that go compiles these two options to the same byte code. So, my recommendation here was in fact incorrect.

myStr := "hello world!"
// this line
if len(myStr) > 0 {
    ...
}
// compiles to the same byte code as this line
if myStr != "" {
    ...
}

ctx = context.WithValue(ctx, DdTraceId, traceId[0])
}
parentId := response.Header.Values(string(DdParentId))
if len(parentId) > 0 {
ctx = context.WithValue(ctx, DdParentId, parentId[0])
}
samplingPriority := response.Header.Values(string(DdSamplingPriority))
if len(samplingPriority) > 0 {
ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority[0])
}
}
return ctx
}

func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functionExecutionSpan ddtrace.Span, err error) {
// Handle Lambda response
lambdaResponse, ok := ctx.Value(DdLambdaResponse).([]byte)
content, _ := json.Marshal(lambdaResponse)
if !ok {
content, _ = json.Marshal("{}")
}
body := bytes.NewBuffer(content)

// Build the request
req, _ := http.NewRequest(http.MethodPost, em.endInvocationUrl, body)

// Mark the invocation as an error if any
if err != nil {
req.Header[string(DdInvocationError)] = append(req.Header[string(DdInvocationError)], "true")
}

// Extract the DD trace context and pass them to the extension via request headers
traceId, ok := ctx.Value(DdTraceId).(string)
if ok {
req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], traceId)
if parentId, ok := ctx.Value(DdParentId).(string); ok {
req.Header[string(DdParentId)] = append(req.Header[string(DdParentId)], parentId)
}
if spanId, ok := ctx.Value(DdSpanId).(string); ok {
req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], spanId)
}
if samplingPriority, ok := ctx.Value(DdSamplingPriority).(string); ok {
req.Header[string(DdSamplingPriority)] = append(req.Header[string(DdSamplingPriority)], samplingPriority)
}
} else {
req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], fmt.Sprint(functionExecutionSpan.Context().TraceID()))
req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], fmt.Sprint(functionExecutionSpan.Context().SpanID()))
}

response, err := em.httpClient.Do(req)
if response.StatusCode != 200 || err != nil {
logger.Debug("Unable to make a request to the extension's end invocation endpoint")
}
}

Expand Down
21 changes: 18 additions & 3 deletions internal/trace/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,34 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont
tracerInitialized = true
}

functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces)
functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces, l.extensionManager.IsExtensionRunning())

// Add the span to the context so the user can create child spans
ctx = tracer.ContextWithSpan(ctx, functionExecutionSpan)

if l.extensionManager.IsExtensionRunning() {
ctx = l.extensionManager.SendStartInvocationRequest(ctx, msg)
}

return ctx
}

// HandlerFinished ends the function execution span and stops the tracer
func (l *Listener) HandlerFinished(ctx context.Context, err error) {
if functionExecutionSpan != nil {
functionExecutionSpan.Finish(tracer.WithError(err))

if l.extensionManager.IsExtensionRunning() {
l.extensionManager.SendEndInvocationRequest(ctx, functionExecutionSpan, err)
}
}

tracer.Flush()
}

// startFunctionExecutionSpan starts a span that represents the current Lambda function execution
// and returns the span so that it can be finished when the function execution is complete
func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool) tracer.Span {
func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool, isExtensionRunning bool) tracer.Span {
// Extract information from context
lambdaCtx, _ := lambdacontext.FromContext(ctx)
rootTraceContext, ok := ctx.Value(traceContextKey).(TraceContext)
Expand All @@ -109,11 +118,17 @@ func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool) trace
parentSpanContext = convertedSpanContext
}

resourceName := lambdacontext.FunctionName
if isExtensionRunning {
// The extension will drop this span, prioritizing the execution span the extension creates
resourceName = string(extension.DdSeverlessSpan)
}

span := tracer.StartSpan(
"aws.lambda", // This operation name will be replaced with the value of the service tag by the Forwarder
tracer.SpanType("serverless"),
tracer.ChildOf(parentSpanContext),
tracer.ResourceName(lambdacontext.FunctionName),
tracer.ResourceName(resourceName),
tracer.Tag("cold_start", ctx.Value("cold_start")),
tracer.Tag("function_arn", functionArn),
tracer.Tag("function_version", functionVersion),
Expand Down
8 changes: 4 additions & 4 deletions internal/trace/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestStartFunctionExecutionSpanFromXrayWithMergeEnabled(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

span := startFunctionExecutionSpan(ctx, true)
span := startFunctionExecutionSpan(ctx, true, false)
span.Finish()
finishedSpan := mt.FinishedSpans()[0]

Expand Down Expand Up @@ -104,7 +104,7 @@ func TestStartFunctionExecutionSpanFromXrayWithMergeDisabled(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

span := startFunctionExecutionSpan(ctx, false)
span := startFunctionExecutionSpan(ctx, false, false)
span.Finish()
finishedSpan := mt.FinishedSpans()[0]

Expand All @@ -123,7 +123,7 @@ func TestStartFunctionExecutionSpanFromEventWithMergeEnabled(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

span := startFunctionExecutionSpan(ctx, true)
span := startFunctionExecutionSpan(ctx, true, false)
span.Finish()
finishedSpan := mt.FinishedSpans()[0]

Expand All @@ -142,7 +142,7 @@ func TestStartFunctionExecutionSpanFromEventWithMergeDisabled(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

span := startFunctionExecutionSpan(ctx, false)
span := startFunctionExecutionSpan(ctx, false, false)
span.Finish()
finishedSpan := mt.FinishedSpans()[0]

Expand Down
2 changes: 2 additions & 0 deletions internal/wrapper/wrap_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"

"github.com/DataDog/datadog-lambda-go/internal/extension"
"github.com/DataDog/datadog-lambda-go/internal/logger"
"github.com/aws/aws-lambda-go/lambda"

Expand Down Expand Up @@ -83,6 +84,7 @@ func (h *DatadogHandler) Invoke(ctx context.Context, payload []byte) ([]byte, er
CurrentContext = ctx
result, err := h.handler.Invoke(ctx, payload)
for _, listener := range h.listeners {
ctx = context.WithValue(ctx, extension.DdLambdaResponse, result)
listener.HandlerFinished(ctx, err)
}
h.coldStart = false
Expand Down