Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,55 @@ function startMockAnthropicServer() {
return;
}

// Check if streaming is requested
if (req.body.stream === true) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});

// Send streaming events
const events = [
{
type: 'message_start',
message: {
id: 'msg_stream123',
type: 'message',
role: 'assistant',
model,
content: [],
usage: { input_tokens: 10 },
},
},
{ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } },
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello ' } },
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'from ' } },
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'stream!' } },
{ type: 'content_block_stop', index: 0 },
{
type: 'message_delta',
delta: { stop_reason: 'end_turn', stop_sequence: null },
usage: { output_tokens: 15 },
},
{ type: 'message_stop' },
];

events.forEach((event, index) => {
setTimeout(() => {
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event)}\n\n`);

if (index === events.length - 1) {
res.end();
}
}, index * 10); // Small delay between events
});

return;
}

// Non-streaming response
res.send({
id: 'msg_mock123',
type: 'message',
Expand Down Expand Up @@ -92,8 +141,32 @@ async function run() {

// Fourth test: models.retrieve
await client.models.retrieve('claude-3-haiku-20240307');

// Fifth test: streaming via messages.create
const stream = await client.messages.create({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'What is the capital of France?' }],
stream: true,
});

for await (const _ of stream) {
void _;
}

// Sixth test: streaming via messages.stream
await client.messages
.stream({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'What is the capital of France?' }],
})
.on('streamEvent', () => {
Sentry.captureMessage('stream event from user-added event listener captured');
});
});

// Wait for the stream event handler to finish
await Sentry.flush(2000);

server.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,30 @@ describe('Anthropic integration', () => {
origin: 'auto.ai.anthropic',
status: 'ok',
}),
// Fifth span - messages.create with stream: true
expect.objectContaining({
data: expect.objectContaining({
'gen_ai.operation.name': 'messages',
'gen_ai.request.model': 'claude-3-haiku-20240307',
'gen_ai.request.stream': true,
}),
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
origin: 'auto.ai.anthropic',
status: 'ok',
}),
// Sixth span - messages.stream
expect.objectContaining({
data: expect.objectContaining({
'gen_ai.operation.name': 'messages',
'gen_ai.request.model': 'claude-3-haiku-20240307',
'gen_ai.request.stream': true,
}),
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
origin: 'auto.ai.anthropic',
status: 'ok',
}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Stream Attribute Mismatch in Test

The test for the messages.stream span incorrectly expects gen_ai.request.stream: true. This attribute is only set when an explicit stream parameter is passed, which messages.stream does not use as it's inherently streaming. The instrumentation correctly reflects this by omitting the attribute for messages.stream calls.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true, we add this in both cases.

]),
};

Expand Down Expand Up @@ -189,6 +213,21 @@ describe('Anthropic integration', () => {
]),
};

const EXPECTED_MODEL_ERROR = {
exception: {
values: [
{
type: 'Error',
value: '404 Model not found',
},
],
},
};

const EXPECTED_STREAM_EVENT_HANDLER_MESSAGE = {
message: 'stream event from user-added event listener captured',
};

createEsmAndCjsTests(__dirname, 'scenario-manual-client.mjs', 'instrument.mjs', (createRunner, test) => {
test('creates anthropic related spans when manually insturmenting client', async () => {
await createRunner()
Expand All @@ -202,8 +241,9 @@ describe('Anthropic integration', () => {
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => {
test('creates anthropic related spans with sendDefaultPii: false', async () => {
await createRunner()
.ignore('event')
.expect({ event: EXPECTED_MODEL_ERROR })
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE })
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
.start()
.completed();
});
Expand All @@ -212,8 +252,9 @@ describe('Anthropic integration', () => {
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
test('creates anthropic related spans with sendDefaultPii: true', async () => {
await createRunner()
.ignore('event')
.expect({ event: EXPECTED_MODEL_ERROR })
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE })
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
.start()
.completed();
});
Expand All @@ -222,8 +263,9 @@ describe('Anthropic integration', () => {
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => {
test('creates anthropic related spans with custom options', async () => {
await createRunner()
.ignore('event')
.expect({ event: EXPECTED_MODEL_ERROR })
.expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS })
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
.start()
.completed();
});
Expand Down
184 changes: 116 additions & 68 deletions packages/core/src/utils/anthropic-ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from '../ai/gen-ai-attributes';
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
import { handleCallbackErrors } from '../handleCallbackErrors';
import { instrumentStream } from './streaming';
import { instrumentAsyncIterableStream, instrumentMessageStream } from './streaming';
import type {
AnthropicAiInstrumentedMethod,
AnthropicAiOptions,
Expand Down Expand Up @@ -194,6 +194,74 @@ function addResponseAttributes(span: Span, response: AnthropicAiResponse, record
addMetadataAttributes(span, response);
}

/**
* Handle common error catching and reporting for streaming requests
*/
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
captureException(error, {
mechanism: { handled: false, type: 'auto.ai.anthropic', data: { function: methodPath } },
});

if (span.isRecording()) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
}
throw error;
}

/**
* Handle streaming cases with common logic
*/
function handleStreamingRequest<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
target: (...args: T) => Promise<R>,
context: unknown,
args: T,
requestAttributes: Record<string, unknown>,
operationName: string,
methodPath: string,
params: Record<string, unknown> | undefined,
options: AnthropicAiOptions,
isStreamRequested: boolean,
): Promise<R> {
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const spanConfig = {
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
};

if (isStreamRequested) {
return startSpanManual(spanConfig, async span => {
try {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}
const result = await originalMethod.apply(context, args);
return instrumentAsyncIterableStream(
result as AsyncIterable<AnthropicAiStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
return handleStreamingError(error, span, methodPath);
}
});
} else {
return startSpanManual(spanConfig, span => {
try {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}
const messageStream = target.apply(context, args);
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
} catch (error) {
return handleStreamingError(error, span, methodPath);
}
});
}

This comment was marked as outdated.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Promise Handling and Context Propagation Issues

For messages.stream calls (when isStreamRequested is false), handleStreamingRequest passes a Promise to instrumentMessageStream instead of the resolved MessageStream object, which will cause a runtime error. Additionally, the this context (thisArg) isn't correctly propagated to the original method calls within the proxy.

Fix in Cursor Fix in Web

}

/**
* Instrument a method with Sentry spans
* Following Sentry AI Agents Manual Instrumentation conventions
Expand All @@ -205,82 +273,62 @@ function instrumentMethod<T extends unknown[], R>(
context: unknown,
options: AnthropicAiOptions,
): (...args: T) => Promise<R> {
return async function instrumentedMethod(...args: T): Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const operationName = getFinalOperationName(methodPath);
return new Proxy(originalMethod, {
apply(target, thisArg, args: T): Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const operationName = getFinalOperationName(methodPath);

const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
const isStreamRequested = Boolean(params?.stream);
const isStreamingMethod = methodPath === 'messages.stream';
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
const isStreamRequested = Boolean(params?.stream);
const isStreamingMethod = methodPath === 'messages.stream';

if (isStreamRequested || isStreamingMethod) {
return startSpanManual(
if (isStreamRequested || isStreamingMethod) {
return handleStreamingRequest(
originalMethod,
target,
context,
args,
requestAttributes,
operationName,
methodPath,
params,
options,
isStreamRequested,
);
}

return startSpan(
{
name: `${operationName} ${model} stream-response`,
name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async span => {
try {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}

const result = await originalMethod.apply(context, args);
return instrumentStream(
result as AsyncIterable<AnthropicAiStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.anthropic',
data: {
function: methodPath,
},
},
});
span.end();
throw error;
span => {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}
},
);
}

return startSpan(
{
name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
span => {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}

return handleCallbackErrors(
() => originalMethod.apply(context, args),
error => {
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.anthropic',
data: {
function: methodPath,
return handleCallbackErrors(
() => target.apply(context, args),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Proxy Method Calls Misbind this

Instrumented methods incorrectly bind this when called through the proxy. The apply calls use a captured context variable instead of the thisArg provided by the proxy, which can break this binding for methods invoked with a specific context.

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable I guess, or do we bind this to context on purpose here? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure tbh, Rola did this initially and tests seem to pass. I can try changing it but didn't want to introduce another potential problem in this PR.

error => {
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.anthropic',
data: {
function: methodPath,
},
},
},
});
},
() => {},
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
);
},
);
};
});
},
() => {},
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
);
},
);
},
}) as (...args: T) => Promise<R>;
}

/**
Expand Down
Loading