Skip to content

Commit 1cdcde5

Browse files
[SLS-2330] Add support for universal instrumentation with the extension (#116)
add option to use universal instrumentation
1 parent bf8b37f commit 1cdcde5

File tree

6 files changed

+286
-43
lines changed

6 files changed

+286
-43
lines changed

ddlambda.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ const (
8989
DatadogTraceEnabledEnvVar = "DD_TRACE_ENABLED"
9090
// MergeXrayTracesEnvVar is the environment variable that enables the merging of X-Ray and Datadog traces.
9191
MergeXrayTracesEnvVar = "DD_MERGE_XRAY_TRACES"
92+
// UniversalInstrumentation is the environment variable that enables universal instrumentation with the DD Extension
93+
UniversalInstrumentation = "DD_UNIVERSAL_INSTRUMENTATION"
9294

9395
// DefaultSite to send API messages to.
9496
DefaultSite = "datadoghq.com"
@@ -183,8 +185,9 @@ func InvokeDryRun(callback func(ctx context.Context), cfg *Config) (interface{},
183185

184186
func (cfg *Config) toTraceConfig() trace.Config {
185187
traceConfig := trace.Config{
186-
DDTraceEnabled: false,
187-
MergeXrayTraces: false,
188+
DDTraceEnabled: false,
189+
MergeXrayTraces: false,
190+
UniversalInstrumentation: false,
188191
}
189192

190193
if cfg != nil {
@@ -205,6 +208,10 @@ func (cfg *Config) toTraceConfig() trace.Config {
205208
traceConfig.MergeXrayTraces, _ = strconv.ParseBool(os.Getenv(MergeXrayTracesEnvVar))
206209
}
207210

211+
if !traceConfig.UniversalInstrumentation {
212+
traceConfig.UniversalInstrumentation, _ = strconv.ParseBool(os.Getenv(UniversalInstrumentation))
213+
}
214+
208215
return traceConfig
209216
}
210217

@@ -213,12 +220,14 @@ func initializeListeners(cfg *Config) []wrapper.HandlerListener {
213220
if strings.EqualFold(logLevel, "debug") || (cfg != nil && cfg.DebugLogging) {
214221
logger.SetLogLevel(logger.LevelDebug)
215222
}
216-
extensionManager := extension.BuildExtensionManager()
223+
traceConfig := cfg.toTraceConfig()
224+
extensionManager := extension.BuildExtensionManager(traceConfig.UniversalInstrumentation)
217225
isExtensionRunning := extensionManager.IsExtensionRunning()
226+
metricsConfig := cfg.toMetricsConfig(isExtensionRunning)
218227

219228
// Wrap the handler with listeners that add instrumentation for traces and metrics.
220-
tl := trace.MakeListener(cfg.toTraceConfig(), extensionManager)
221-
ml := metrics.MakeListener(cfg.toMetricsConfig(isExtensionRunning), extensionManager)
229+
tl := trace.MakeListener(traceConfig, extensionManager)
230+
ml := metrics.MakeListener(metricsConfig, extensionManager)
222231
return []wrapper.HandlerListener{
223232
&tl, &ml,
224233
}

internal/extension/extension.go

Lines changed: 111 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,30 @@
99
package extension
1010

1111
import (
12+
"bytes"
13+
"context"
14+
"encoding/json"
1215
"fmt"
1316
"net/http"
1417
"os"
1518
"time"
1619

1720
"github.com/DataDog/datadog-lambda-go/internal/logger"
21+
22+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
23+
)
24+
25+
type ddTraceContext string
26+
27+
const (
28+
DdTraceId ddTraceContext = "x-datadog-trace-id"
29+
DdParentId ddTraceContext = "x-datadog-parent-id"
30+
DdSpanId ddTraceContext = "x-datadog-span-id"
31+
DdSamplingPriority ddTraceContext = "x-datadog-sampling-priority"
32+
DdInvocationError ddTraceContext = "x-datadog-invocation-error"
33+
34+
DdSeverlessSpan ddTraceContext = "dd-tracer-serverless-span"
35+
DdLambdaResponse ddTraceContext = "dd-response"
1836
)
1937

2038
const (
@@ -23,30 +41,38 @@ const (
2341
// want to let it having some time for its cold start so we should not set this too low.
2442
timeout = 3000 * time.Millisecond
2543

26-
helloUrl = "http://localhost:8124/lambda/hello"
27-
flushUrl = "http://localhost:8124/lambda/flush"
44+
helloUrl = "http://localhost:8124/lambda/hello"
45+
flushUrl = "http://localhost:8124/lambda/flush"
46+
startInvocationUrl = "http://localhost:8124/lambda/start-invocation"
47+
endInvocationUrl = "http://localhost:8124/lambda/end-invocation"
2848

2949
extensionPath = "/opt/extensions/datadog-agent"
3050
)
3151

3252
type ExtensionManager struct {
33-
helloRoute string
34-
flushRoute string
35-
extensionPath string
36-
httpClient HTTPClient
37-
isExtensionRunning bool
53+
helloRoute string
54+
flushRoute string
55+
extensionPath string
56+
startInvocationUrl string
57+
endInvocationUrl string
58+
httpClient HTTPClient
59+
isExtensionRunning bool
60+
isUniversalInstrumentation bool
3861
}
3962

4063
type HTTPClient interface {
4164
Do(req *http.Request) (*http.Response, error)
4265
}
4366

44-
func BuildExtensionManager() *ExtensionManager {
67+
func BuildExtensionManager(isUniversalInstrumentation bool) *ExtensionManager {
4568
em := &ExtensionManager{
46-
helloRoute: helloUrl,
47-
flushRoute: flushUrl,
48-
extensionPath: extensionPath,
49-
httpClient: &http.Client{Timeout: timeout},
69+
helloRoute: helloUrl,
70+
flushRoute: flushUrl,
71+
startInvocationUrl: startInvocationUrl,
72+
endInvocationUrl: endInvocationUrl,
73+
extensionPath: extensionPath,
74+
httpClient: &http.Client{Timeout: timeout},
75+
isUniversalInstrumentation: isUniversalInstrumentation,
5076
}
5177
em.checkAgentRunning()
5278
return em
@@ -57,15 +83,81 @@ func (em *ExtensionManager) checkAgentRunning() {
5783
logger.Debug("Will use the API")
5884
em.isExtensionRunning = false
5985
} else {
60-
req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil)
61-
if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
62-
logger.Debug("Will use the Serverless Agent")
63-
em.isExtensionRunning = true
64-
} else {
65-
logger.Debug("Will use the API since the Serverless Agent was detected but the hello route was unreachable")
66-
em.isExtensionRunning = false
86+
logger.Debug("Will use the Serverless Agent")
87+
em.isExtensionRunning = true
88+
89+
// Tell the extension not to create an execution span if universal instrumentation is disabled
90+
if !em.isUniversalInstrumentation {
91+
req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil)
92+
if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
93+
logger.Debug("Hit the extension /hello route")
94+
} else {
95+
logger.Debug("Will use the API since the Serverless Agent was detected but the hello route was unreachable")
96+
em.isExtensionRunning = false
97+
}
98+
}
99+
}
100+
}
101+
102+
func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, eventPayload json.RawMessage) context.Context {
103+
body := bytes.NewBuffer(eventPayload)
104+
req, _ := http.NewRequest(http.MethodPost, em.startInvocationUrl, body)
105+
106+
if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
107+
// Propagate dd-trace context from the extension response if found in the response headers
108+
traceId := response.Header.Get(string(DdTraceId))
109+
if traceId != "" {
110+
ctx = context.WithValue(ctx, DdTraceId, traceId)
111+
}
112+
parentId := response.Header.Get(string(DdParentId))
113+
if parentId != "" {
114+
ctx = context.WithValue(ctx, DdParentId, parentId)
115+
}
116+
samplingPriority := response.Header.Get(string(DdSamplingPriority))
117+
if samplingPriority != "" {
118+
ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority)
67119
}
68120
}
121+
return ctx
122+
}
123+
124+
func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functionExecutionSpan ddtrace.Span, err error) {
125+
// Handle Lambda response
126+
lambdaResponse := ctx.Value(DdLambdaResponse)
127+
content, responseErr := json.Marshal(lambdaResponse)
128+
if responseErr != nil {
129+
content = []byte("{}")
130+
}
131+
body := bytes.NewBuffer(content)
132+
req, _ := http.NewRequest(http.MethodPost, em.endInvocationUrl, body)
133+
134+
// Mark the invocation as an error if any
135+
if err != nil {
136+
req.Header.Set(string(DdInvocationError), "true")
137+
}
138+
139+
// Extract the DD trace context and pass them to the extension via request headers
140+
traceId, ok := ctx.Value(DdTraceId).(string)
141+
if ok {
142+
req.Header.Set(string(DdTraceId), traceId)
143+
if parentId, ok := ctx.Value(DdParentId).(string); ok {
144+
req.Header.Set(string(DdParentId), parentId)
145+
}
146+
if spanId, ok := ctx.Value(DdSpanId).(string); ok {
147+
req.Header.Set(string(DdSpanId), spanId)
148+
}
149+
if samplingPriority, ok := ctx.Value(DdSamplingPriority).(string); ok {
150+
req.Header.Set(string(DdSamplingPriority), samplingPriority)
151+
}
152+
} else {
153+
req.Header.Set(string(DdTraceId), fmt.Sprint(functionExecutionSpan.Context().TraceID()))
154+
req.Header.Set(string(DdSpanId), fmt.Sprint(functionExecutionSpan.Context().SpanID()))
155+
}
156+
157+
resp, err := em.httpClient.Do(req)
158+
if err != nil || resp.StatusCode != 200 {
159+
logger.Error(fmt.Errorf("could not send end invocation payload to the extension: %v", err))
160+
}
69161
}
70162

71163
func (em *ExtensionManager) IsExtensionRunning() bool {

internal/extension/extension_test.go

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
package extension
1010

1111
import (
12+
"bytes"
13+
"context"
1214
"fmt"
1315
"net/http"
1416
"os"
1517
"testing"
1618

19+
"github.com/DataDog/datadog-lambda-go/internal/logger"
1720
"github.com/stretchr/testify/assert"
21+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
1822
)
1923

2024
type ClientErrorMock struct {
@@ -26,6 +30,19 @@ type ClientSuccessMock struct {
2630
type ClientSuccess202Mock struct {
2731
}
2832

33+
type ClientSuccessStartInvoke struct {
34+
headers http.Header
35+
}
36+
37+
type ClientSuccessEndInvoke struct {
38+
}
39+
40+
const (
41+
mockTraceId = "1"
42+
mockParentId = "2"
43+
mockSamplingPriority = "3"
44+
)
45+
2946
func (c *ClientErrorMock) Do(req *http.Request) (*http.Response, error) {
3047
return nil, fmt.Errorf("KO")
3148
}
@@ -38,11 +55,30 @@ func (c *ClientSuccess202Mock) Do(req *http.Request) (*http.Response, error) {
3855
return &http.Response{StatusCode: 202, Status: "KO"}, nil
3956
}
4057

58+
func (c *ClientSuccessStartInvoke) Do(req *http.Request) (*http.Response, error) {
59+
return &http.Response{StatusCode: 200, Status: "KO", Header: c.headers}, nil
60+
}
61+
62+
func (c *ClientSuccessEndInvoke) Do(req *http.Request) (*http.Response, error) {
63+
return &http.Response{StatusCode: 200, Status: "KO"}, nil
64+
}
65+
66+
func captureLog(f func()) string {
67+
var buf bytes.Buffer
68+
logger.SetOutput(&buf)
69+
f()
70+
logger.SetOutput(os.Stdout)
71+
return buf.String()
72+
}
73+
4174
func TestBuildExtensionManager(t *testing.T) {
42-
em := BuildExtensionManager()
75+
em := BuildExtensionManager(false)
4376
assert.Equal(t, "http://localhost:8124/lambda/hello", em.helloRoute)
4477
assert.Equal(t, "http://localhost:8124/lambda/flush", em.flushRoute)
78+
assert.Equal(t, "http://localhost:8124/lambda/start-invocation", em.startInvocationUrl)
79+
assert.Equal(t, "http://localhost:8124/lambda/end-invocation", em.endInvocationUrl)
4580
assert.Equal(t, "/opt/extensions/datadog-agent", em.extensionPath)
81+
assert.Equal(t, false, em.isUniversalInstrumentation)
4682
assert.NotNil(t, em.httpClient)
4783
}
4884

@@ -96,3 +132,68 @@ func TestFlushSuccess(t *testing.T) {
96132
err := em.Flush()
97133
assert.Nil(t, err)
98134
}
135+
136+
func TestExtensionStartInvoke(t *testing.T) {
137+
em := &ExtensionManager{
138+
startInvocationUrl: startInvocationUrl,
139+
httpClient: &ClientSuccessStartInvoke{},
140+
}
141+
ctx := em.SendStartInvocationRequest(context.TODO(), []byte{})
142+
traceId := ctx.Value(DdTraceId)
143+
parentId := ctx.Value(DdParentId)
144+
samplingPriority := ctx.Value(DdSamplingPriority)
145+
err := em.Flush()
146+
147+
assert.Nil(t, err)
148+
assert.Nil(t, traceId)
149+
assert.Nil(t, parentId)
150+
assert.Nil(t, samplingPriority)
151+
}
152+
153+
func TestExtensionStartInvokeWithTraceContext(t *testing.T) {
154+
headers := http.Header{}
155+
headers.Set(string(DdTraceId), mockTraceId)
156+
headers.Set(string(DdParentId), mockParentId)
157+
headers.Set(string(DdSamplingPriority), mockSamplingPriority)
158+
159+
em := &ExtensionManager{
160+
startInvocationUrl: startInvocationUrl,
161+
httpClient: &ClientSuccessStartInvoke{
162+
headers: headers,
163+
},
164+
}
165+
ctx := em.SendStartInvocationRequest(context.TODO(), []byte{})
166+
traceId := ctx.Value(DdTraceId)
167+
parentId := ctx.Value(DdParentId)
168+
samplingPriority := ctx.Value(DdSamplingPriority)
169+
err := em.Flush()
170+
171+
assert.Nil(t, err)
172+
assert.Equal(t, mockTraceId, traceId)
173+
assert.Equal(t, mockParentId, parentId)
174+
assert.Equal(t, mockSamplingPriority, samplingPriority)
175+
}
176+
177+
func TestExtensionEndInvocation(t *testing.T) {
178+
em := &ExtensionManager{
179+
endInvocationUrl: endInvocationUrl,
180+
httpClient: &ClientSuccessEndInvoke{},
181+
}
182+
span := tracer.StartSpan("aws.lambda")
183+
logOutput := captureLog(func() { em.SendEndInvocationRequest(context.TODO(), span, nil) })
184+
span.Finish()
185+
186+
assert.Equal(t, "", logOutput)
187+
}
188+
189+
func TestExtensionEndInvocationError(t *testing.T) {
190+
em := &ExtensionManager{
191+
endInvocationUrl: endInvocationUrl,
192+
httpClient: &ClientErrorMock{},
193+
}
194+
span := tracer.StartSpan("aws.lambda")
195+
logOutput := captureLog(func() { em.SendEndInvocationRequest(context.TODO(), span, nil) })
196+
span.Finish()
197+
198+
assert.Contains(t, logOutput, "could not send end invocation payload to the extension")
199+
}

0 commit comments

Comments
 (0)