Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use SubscriptionId as the key for active_subscriptions
  • Loading branch information
HCastano committed May 21, 2020
commit 0149fd4c44f619bf31e1194a27877ba86f9bf4dd
9 changes: 4 additions & 5 deletions pubsub/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Provides an executor for subscription Futures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should expand on this module description a bit. The subscription manager is opinionated way to handle processing subscriptions that are based on Stream coming from the user codebase.
The manager takes care of:

  1. Assigning the ID
  2. Consuming a Stream of events (coming from user code) and transforming it into a subscription notifications to the client.
  3. Spawning a resulting future to executor
  4. Cancelling the stream when the subscription is canceled.


use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

use crate::core::futures::sync::oneshot;
Expand All @@ -19,7 +18,7 @@ pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error
/// Trait used to provide unique subscription ids.
pub trait IdProvider {
/// A unique ID used to identify a subscription.
type Id: Copy + Clone + Default + Eq + Hash + Into<SubscriptionId> + From<SubscriptionId>;
type Id: Copy + Clone + Default + Into<SubscriptionId>;

/// Returns next id for the subscription.
fn next_id(&self) -> Self::Id;
Expand All @@ -32,7 +31,7 @@ pub trait IdProvider {
#[derive(Clone)]
pub struct SubscriptionManager<I: Default + IdProvider> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct SubscriptionManager<I: Default + IdProvider> {
pub struct SubscriptionManager<I: IdProvider = RandomizedStringProvider> {

We should implement two IdProviders:

  1. NumericIdProvider
  2. RandomizedStringProvider

The first one would simply use AtomicU64 and would return increasing numeric values, the second one - which should also be a default to use if SubscriptionManager does not include generic parameter - should return a randomized strings (guids).
The reason for using randomized strings is to make sure that subscription ids are not easy to guess, so that multiple clients can't cancel each-others subscriptions easily. The first one will be mostly usable for tests or cases where the aforementioned issue is not a problem.

next_id: I,
active_subscriptions: Arc<Mutex<HashMap<I::Id, oneshot::Sender<()>>>>,
active_subscriptions: Arc<Mutex<HashMap<SubscriptionId, oneshot::Sender<()>>>>,
executor: TaskExecutor, // Make generic?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO It's fine, in most workloads spawning subscription is not going to be a bottleneck so we can safely do a virtual call here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
executor: TaskExecutor, // Make generic?
executor: TaskExecutor,

}

Expand Down Expand Up @@ -65,7 +64,7 @@ impl<I: Default + IdProvider> SubscriptionManager<I> {
.select(rx.map_err(|e| warn!("Error timing out: {:?}", e)))
.then(|_| Ok(()));

self.active_subscriptions.lock().insert(id, tx);
self.active_subscriptions.lock().insert(subscription_id.clone(), tx);
if self.executor.execute(Box::new(future)).is_err() {
error!("Failed to spawn RPC subscription task");
}
Expand All @@ -78,7 +77,7 @@ impl<I: Default + IdProvider> SubscriptionManager<I> {
///
/// Returns true if subscription existed or false otherwise.
fn cancel(&self, id: SubscriptionId) -> bool {
if let Some(tx) = self.active_subscriptions.lock().remove(&id.into()) {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}
Expand Down