-
Notifications
You must be signed in to change notification settings - Fork 725
feat: add audit logging for chat completions #3062
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a new auditing subsystem: configuration, a broadcast bus, record/handle types, sinks with worker tasks, and a streaming passthrough that aggregates chunks into a final response. Wires initialization into the input entrypoint, conditionally starting the bus and sinks based on env-driven policy and capacity. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant EP as Entrypoint
participant C as audit::config
participant B as audit::bus
participant S as audit::sink
participant H as audit::handle
participant App as App Logic
EP->>C: policy()
alt policy.enabled
EP->>B: init(capacity from DYN_AUDIT_CAPACITY)
EP->>S: spawn_workers_from_env()
S->>B: subscribe()
Note right of S: Workers ready to receive AuditRecord
else
Note over EP: Auditing disabled
end
App->>H: create_handle(req, request_id)
alt Some(handle)
App->>H: set_request(req) / add_usage(...)
App->>H: set_response(resp)
App->>H: emit()
H->>B: publish(AuditRecord)
B-->>S: broadcast Arc<AuditRecord>
S->>S: emit(record) via each configured sink
else None
Note over App: Skip auditing
end
sequenceDiagram
autonumber
participant Client as Client
participant Stream as Upstream Stream
participant PTA as PassThroughWithAgg
participant Agg as Aggregator Task
participant Fut as Aggregation Future
Client->>PTA: poll_next()
PTA->>Stream: poll_next()
alt Next chunk
Stream-->>PTA: Annotated<Chunk>
PTA->>PTA: buffer clone
PTA-->>Client: forward chunk
else End of stream
Stream-->>PTA: None
PTA->>Agg: spawn aggregate(buffer)
Agg-->>Fut: send final Response
PTA-->>Client: None
end
Note over Fut: Future resolves to NvCreateChatCompletionResponse (fallback on failure)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🧪 Early access (Sonnet 4.5): enabledWe are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience. Note:
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
lib/llm/src/http/service.rs (1)
31-31: Confirm need for public exposure ofauditmoduleIf external consumers don’t need to call auditing APIs, prefer restricting scope to avoid expanding the public surface.
Apply if internal-only:
-pub mod audit; +pub(crate) mod audit;lib/llm/src/http/service/openai.rs (2)
492-501: Audit gating and conditional clone: OK; consider deferring clone to success pathCurrent approach avoids clone unless needed. You could further defer the clone until after the response is folded to skip cloning on error paths (minor perf), but it would reflect
stream=truein the logged request. If preserving the originalstreamflag matters, keep as-is.
597-601: Emit request/response as structured fields and add an explicit tracing targetWith the current
%formatter,request/responsebecome stringified JSON. If your JSONL pipeline expects nested objects, serialize via a serde-aware field or attach a dedicated target for easier filtering.Proposed minimal change (adds a target; keeps current field formatting). For fully structured fields, see follow-up note.
- if let Some(req_copy) = request_for_audit { - let resp_json = serde_json::to_value(&response).unwrap_or(serde_json::Value::Null); - audit::log_stored_completion(&request_id, &req_copy, resp_json); - } + if let Some(req_copy) = request_for_audit { + let resp_json = serde_json::to_value(&response).unwrap_or(serde_json::Value::Null); + audit::log_stored_completion(&request_id, &req_copy, resp_json); + }Follow-up (optional, requires
tracing-serdeand logger support): log as nested objects usingAsSerdeand a target (see audit.rs comment).lib/llm/src/http/service/audit.rs (4)
26-31: Avoid potential panic and normalize timestamp type
duration_since(UNIX_EPOCH).unwrap()can theoretically panic; also prefer i64 for downstream JSON consumers.Apply:
- let ts_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis(); + let ts_ms: i64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0);
32-41: Add a dedicated tracing target and (optionally) emit structured JSON fieldsA target simplifies log routing. Today
%request_val/%response_jsonserialize to strings; if your JSONL stack expects nested objects, emit serde-backed values.Minimal (target only):
- tracing::info!( + tracing::info!(target = "dynamo_audit", log_type = "audit", schema_version = "1.0", ts_ms = ts_ms, store_id = %store_id, request_id = request_id, request = %request_val, response = %response_json, "Audit log for stored completion" );Optional (structured fields; requires adding
tracing-serdeand configuring the JSON formatter to honor it):- tracing::info!(target = "dynamo_audit", - request = %request_val, - response = %response_json, - ... - ); + use tracing_serde::AsSerde; + tracing::info!(target = "dynamo_audit", + request = ?AsSerde(&request_val), + response = ?AsSerde(&response_json), + ... + );Please confirm what your
DYN_LOGGING_JSONLlayer expects.
49-64: Tests cover flag matrix; consider one negative-path clone test (optional)You might add a test asserting no request clone occurs when
stream=trueorstore=false(using counters or a lightweight wrapper), but this is optional.
81-113: Smoke test is fine; consider asserting log shape via a test subscriber (optional)If feasible, attach a test
tracingsubscriber to capture the event and assert presence oflog_type="audit"andrequest_id.I can draft a minimal test subscriber if you’d like.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
lib/llm/src/http/service.rs(1 hunks)lib/llm/src/http/service/audit.rs(1 hunks)lib/llm/src/http/service/openai.rs(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
lib/llm/src/http/service/audit.rs (1)
lib/llm/src/http/service/openai.rs (2)
chat_completions(455-605)s(61-61)
lib/llm/src/http/service/openai.rs (1)
lib/llm/src/http/service/audit.rs (2)
should_audit_flags(15-17)log_stored_completion(19-42)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
🔇 Additional comments (3)
lib/llm/src/http/service/openai.rs (1)
27-27: Import looks goodlib/llm/src/http/service/audit.rs (2)
9-13: Env flag parsing: LGTMCovers "1"/"true" (case-insensitive) and defaults off.
15-17: Audit gating logic is correctNon-streaming + enabled +
store=trueis enforced.
ryanolson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is http the right place for this or is preprocess/processor?
https is just a shim/transport. if we put the auditing here, we need to do the same in the grpc frontend that @GuanLuo is doing/or finished.
the responses are finalized in the post part of the "processor".
that feels like a better place to audit since all frontend code paths (regardless of public api or transport) will flow through there.
@ryanolson thanks for the feedback - makes sense. I’ll work through the comments later this afternoon and move the logic into the processor. |
4eabe60 to
6bc8a07
Compare
6bc8a07 to
cdab20e
Compare
|
@ryanolson ready for re-review. Let me know what you think of using |
548e5d4 to
6bcc3f3
Compare
6bcc3f3 to
9cc0140
Compare
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
…o generate function Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
9cc0140 to
048d6ef
Compare
|
/ok to test 048d6ef |
Signed-off-by: Ryan Lempka <[email protected]>
Signed-off-by: Ryan Lempka <[email protected]>
Overview:
Enables the ability to capture the request/response and write to
stderr. To use this feature set env varDYN_AUDIT_ENABLED=1andstore=truein the request.Enables compliance, distillation, evaluation, and analytics use cases.
This PR begins with stderr as the target but is designed to support the addition of additional targets such as a persistent stream. The stream can be monitored by an independent process that can consume and persist the audit data in a database.
Details:
DYN_AUDIT_ENABLED=1enables auditing,DYN_AUDIT_CAPACITYsets buffer sizeArchitecture: Audit happens post-transform for consistency with client output. Broadcast bus enables fan-out without blocking requests.
Example outputs:
Where to start:
lib/llm/src/audit/- Core system (bus, handle, sink, stream)lib/llm/src/preprocessor.rs- Integration pointlib/llm/src/entrypoint/input.rs- InitializationSummary by CodeRabbit