Skip to content

Commit 2da329a

Browse files
authored
ref: Move envelope queueing directly into the endpoint handler (#1431)
The QueueEnvelope message no longer needs to reside on the EnvelopeManager since #1416. Instead, it has become a simple dispatch function that calls ProcessMetrics and ValidateEnvelope directly on the target actors. To skip a redundant message, this PR removes this message entirely and moves its code into the endpoint handler code. The BufferGuard has moved into utilities. There is no expected change in behavior or performance, although there could be a slight decrease in latency since the endpoint no longer has to wait for this actor.
1 parent db3112c commit 2da329a

File tree

7 files changed

+178
-234
lines changed

7 files changed

+178
-234
lines changed

relay-server/src/actors/envelopes.rs

Lines changed: 4 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::time::Instant;
77
use actix::prelude::*;
88
use actix_web::http::Method;
99
use chrono::Utc;
10-
use failure::Fail;
1110
use futures01::{future, prelude::*, sync::oneshot};
1211

1312
use relay_common::ProjectKey;
@@ -19,15 +18,15 @@ use relay_quotas::Scoping;
1918
use relay_statsd::metric;
2019

2120
use crate::actors::outcome::{DiscardReason, Outcome};
22-
use crate::actors::processor::{EncodeEnvelope, EnvelopeProcessor, ProcessMetrics};
23-
use crate::actors::project_cache::{ProjectCache, UpdateRateLimits, ValidateEnvelope};
21+
use crate::actors::processor::{EncodeEnvelope, EnvelopeProcessor};
22+
use crate::actors::project_cache::{ProjectCache, UpdateRateLimits};
2423
use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError};
2524
use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
2625
use crate::extractors::{PartialDsn, RequestMeta};
2726
use crate::http::{HttpError, Request, RequestBuilder, Response};
2827
use crate::service::ServerError;
2928
use crate::statsd::RelayHistograms;
30-
use crate::utils::{EnvelopeContext, FutureExt as _, Semaphore};
29+
use crate::utils::{EnvelopeContext, FutureExt as _};
3130

3231
#[cfg(feature = "processing")]
3332
use {
@@ -37,66 +36,6 @@ use {
3736
tokio::runtime::Runtime,
3837
};
3938

40-
#[derive(Debug, Fail)]
41-
pub enum QueueEnvelopeError {
42-
#[fail(display = "Too many envelopes (event_buffer_size reached)")]
43-
TooManyEnvelopes,
44-
}
45-
46-
/// Access control for envelope processing.
47-
///
48-
/// The buffer guard is basically a semaphore that ensures the buffer does not outgrow the maximum
49-
/// number of envelopes configured through [`Config::envelope_buffer_size`]. To enter a new envelope
50-
/// into the processing pipeline, use [`BufferGuard::enter`].
51-
#[derive(Debug)]
52-
pub struct BufferGuard {
53-
inner: Semaphore,
54-
capacity: usize,
55-
}
56-
57-
impl BufferGuard {
58-
/// Creates a new `BufferGuard` based on config values.
59-
pub fn new(capacity: usize) -> Self {
60-
let inner = Semaphore::new(capacity);
61-
Self { inner, capacity }
62-
}
63-
64-
/// Returns the unused capacity of the pipeline.
65-
pub fn available(&self) -> usize {
66-
self.inner.available()
67-
}
68-
69-
/// Returns the number of envelopes in the pipeline.
70-
pub fn used(&self) -> usize {
71-
self.capacity.saturating_sub(self.available())
72-
}
73-
74-
/// Reserves resources for processing an envelope in Relay.
75-
///
76-
/// Returns `Ok(EnvelopeContext)` on success, which internally holds a handle to the reserved
77-
/// resources. When the envelope context is dropped, the slot is automatically reclaimed and can
78-
/// be reused by a subsequent call to `enter`.
79-
///
80-
/// If the buffer is full, this function returns `Err`.
81-
pub fn enter(&self, envelope: &Envelope) -> Result<EnvelopeContext, QueueEnvelopeError> {
82-
let permit = self
83-
.inner
84-
.try_acquire()
85-
.ok_or(QueueEnvelopeError::TooManyEnvelopes)?;
86-
87-
metric!(histogram(RelayHistograms::EnvelopeQueueSize) = self.used() as u64);
88-
89-
metric!(
90-
histogram(RelayHistograms::EnvelopeQueueSizePct) = {
91-
let queue_size_pct = self.used() as f64 * 100.0 / self.capacity as f64;
92-
queue_size_pct.floor() as u64
93-
}
94-
);
95-
96-
Ok(EnvelopeContext::new(envelope, permit))
97-
}
98-
}
99-
10039
/// Error created while handling [`SendEnvelope`].
10140
#[derive(Debug, failure::Fail)]
10241
#[allow(clippy::enum_variant_names)]
@@ -207,7 +146,6 @@ impl UpstreamRequest for SendEnvelope {
207146

208147
pub struct EnvelopeManager {
209148
config: Arc<Config>,
210-
buffer_guard: Arc<BufferGuard>,
211149
captures: BTreeMap<EventId, CapturedEnvelope>,
212150
#[cfg(feature = "processing")]
213151
store_forwarder: Option<ServiceAddr<StoreForwarder>>,
@@ -216,10 +154,7 @@ pub struct EnvelopeManager {
216154
}
217155

218156
impl EnvelopeManager {
219-
pub fn create(
220-
config: Arc<Config>,
221-
buffer_guard: Arc<BufferGuard>,
222-
) -> Result<Self, ServerError> {
157+
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
223158
// Enter the tokio runtime so we can start spawning tasks from the outside.
224159
#[cfg(feature = "processing")]
225160
let runtime = crate::utils::tokio_runtime_with_actix();
@@ -237,7 +172,6 @@ impl EnvelopeManager {
237172

238173
Ok(EnvelopeManager {
239174
config,
240-
buffer_guard,
241175
captures: BTreeMap::new(),
242176
#[cfg(feature = "processing")]
243177
store_forwarder,
@@ -350,103 +284,6 @@ impl Default for EnvelopeManager {
350284
}
351285
}
352286

353-
/// Queues an envelope for processing.
354-
///
355-
/// Depending on the items in the envelope, there are multiple outcomes:
356-
///
357-
/// - Events and event related items, such as attachments, are always queued together. See the
358-
/// [crate-level documentation](crate) for a full description of how envelopes are
359-
/// queued and processed.
360-
/// - Sessions and Session batches are always queued separately. If they occur in the same envelope
361-
/// as an event, they are split off. Their path is the same as other Envelopes.
362-
/// - Metrics are directly sent to the [`EnvelopeProcessor`], bypassing the manager's queue and
363-
/// going straight into metrics aggregation. See [`ProcessMetrics`] for a full description.
364-
///
365-
/// Queueing can fail if the queue exceeds [`Config::envelope_buffer_size`]. In this case, `Err` is
366-
/// returned and the envelope is not queued. Otherwise, this message responds with `Ok`. If it
367-
/// contained an event-related item, such as an event payload or an attachment, this contains
368-
/// `Some(EventId)`.
369-
pub struct QueueEnvelope {
370-
pub envelope: Envelope,
371-
pub envelope_context: EnvelopeContext,
372-
pub project_key: ProjectKey,
373-
pub start_time: Instant,
374-
}
375-
376-
impl Message for QueueEnvelope {
377-
type Result = Result<Option<EventId>, QueueEnvelopeError>;
378-
}
379-
380-
impl Handler<QueueEnvelope> for EnvelopeManager {
381-
type Result = Result<Option<EventId>, QueueEnvelopeError>;
382-
383-
fn handle(&mut self, message: QueueEnvelope, _: &mut Self::Context) -> Self::Result {
384-
let QueueEnvelope {
385-
mut envelope,
386-
mut envelope_context,
387-
project_key,
388-
start_time,
389-
} = message;
390-
391-
let event_id = envelope.event_id();
392-
393-
// Remove metrics from the envelope and queue them directly on the project's `Aggregator`.
394-
let mut metric_items = Vec::new();
395-
let is_metric = |i: &Item| matches!(i.ty(), ItemType::Metrics | ItemType::MetricBuckets);
396-
while let Some(item) = envelope.take_item_by(is_metric) {
397-
metric_items.push(item);
398-
}
399-
400-
if !metric_items.is_empty() {
401-
relay_log::trace!("sending metrics into processing queue");
402-
EnvelopeProcessor::from_registry().do_send(ProcessMetrics {
403-
items: metric_items,
404-
project_key,
405-
start_time,
406-
sent_at: envelope.sent_at(),
407-
});
408-
}
409-
410-
// Split the envelope into event-related items and other items. This allows to fast-track:
411-
// 1. Envelopes with only session items. They only require rate limiting.
412-
// 2. Event envelope processing can bail out if the event is filtered or rate limited,
413-
// since all items depend on this event.
414-
if let Some(event_envelope) = envelope.split_by(Item::requires_event) {
415-
relay_log::trace!("queueing separate envelope for non-event items");
416-
417-
// The envelope has been split, so we need to fork the context.
418-
let event_context = self.buffer_guard.enter(&event_envelope)?;
419-
// Update the old context after successful forking.
420-
envelope_context.update(&envelope);
421-
422-
ProjectCache::from_registry().do_send(ValidateEnvelope::new(
423-
project_key,
424-
event_envelope,
425-
event_context,
426-
));
427-
}
428-
429-
if envelope.is_empty() {
430-
// The envelope can be empty here if it contained only metrics items which were removed
431-
// above. In this case, the envelope was accepted and needs no further queueing.
432-
envelope_context.accept();
433-
} else {
434-
relay_log::trace!("queueing envelope");
435-
ProjectCache::from_registry().do_send(ValidateEnvelope::new(
436-
project_key,
437-
envelope,
438-
envelope_context,
439-
));
440-
}
441-
442-
// Actual event handling is performed asynchronously in a separate future. The lifetime of
443-
// that future will be tied to the EnvelopeManager's context. This allows to keep the Project
444-
// actor alive even if it is cleaned up in the ProjectManager.
445-
446-
Ok(event_id)
447-
}
448-
}
449-
450287
/// Sends an envelope to the upstream or Kafka.
451288
pub struct SubmitEnvelope {
452289
pub envelope: Envelope,

relay-server/src/actors/project_cache.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -408,19 +408,14 @@ impl Handler<GetCachedProjectState> for ProjectCache {
408408
/// - Cached rate limits
409409
#[derive(Debug)]
410410
pub struct CheckEnvelope {
411-
project_key: ProjectKey,
412411
envelope: Envelope,
413412
context: EnvelopeContext,
414413
}
415414

416415
impl CheckEnvelope {
417416
/// Uses a cached project state and checks the envelope.
418-
pub fn new(project_key: ProjectKey, envelope: Envelope, context: EnvelopeContext) -> Self {
419-
Self {
420-
project_key,
421-
envelope,
422-
context,
423-
}
417+
pub fn new(envelope: Envelope, context: EnvelopeContext) -> Self {
418+
Self { envelope, context }
424419
}
425420
}
426421

@@ -442,7 +437,7 @@ impl Handler<CheckEnvelope> for ProjectCache {
442437
type Result = Result<CheckedEnvelope, DiscardReason>;
443438

444439
fn handle(&mut self, message: CheckEnvelope, _: &mut Self::Context) -> Self::Result {
445-
let project = self.get_or_create_project(message.project_key);
440+
let project = self.get_or_create_project(message.envelope.meta().public_key());
446441

447442
// Preload the project cache so that it arrives a little earlier in processing. However,
448443
// do not pass `no_cache`. In case the project is rate limited, we do not want to force
@@ -465,22 +460,13 @@ impl Handler<CheckEnvelope> for ProjectCache {
465460
///
466461
/// [`EnvelopeProcessor`]: crate::actors::processor::EnvelopeProcessor
467462
pub struct ValidateEnvelope {
468-
project_key: ProjectKey,
469463
envelope: Envelope,
470-
envelope_context: EnvelopeContext,
464+
context: EnvelopeContext,
471465
}
472466

473467
impl ValidateEnvelope {
474-
pub fn new(
475-
project_key: ProjectKey,
476-
envelope: Envelope,
477-
envelope_context: EnvelopeContext,
478-
) -> Self {
479-
Self {
480-
project_key,
481-
envelope,
482-
envelope_context,
483-
}
468+
pub fn new(envelope: Envelope, context: EnvelopeContext) -> Self {
469+
Self { envelope, context }
484470
}
485471
}
486472

@@ -498,8 +484,8 @@ impl Handler<ValidateEnvelope> for ProjectCache {
498484
.get_or_fetch_state(message.envelope.meta().no_cache());
499485
}
500486

501-
self.get_or_create_project(message.project_key)
502-
.enqueue_validation(message.envelope, message.envelope_context);
487+
self.get_or_create_project(message.envelope.meta().public_key())
488+
.enqueue_validation(message.envelope, message.context);
503489
}
504490
}
505491

0 commit comments

Comments
 (0)