From 2edda438e0b2281a0753782ba9ed7bb991f466bf Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Tue, 9 Mar 2021 14:22:27 +0800 Subject: [PATCH] Return error when unsubscribing from invalid subscription id --- pubsub/Cargo.toml | 1 + pubsub/src/subscription.rs | 107 +++++++++++++++++++++++++++++++------ 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/pubsub/Cargo.toml b/pubsub/Cargo.toml index 566990b3b..f4d0f2e26 100644 --- a/pubsub/Cargo.toml +++ b/pubsub/Cargo.toml @@ -21,6 +21,7 @@ serde = "1.0" [dev-dependencies] jsonrpc-tcp-server = { version = "17.0", path = "../tcp" } +serde_json = "1.0" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/pubsub/src/subscription.rs b/pubsub/src/subscription.rs index 8c02512a4..14e6ddcbb 100644 --- a/pubsub/src/subscription.rs +++ b/pubsub/src/subscription.rs @@ -81,8 +81,11 @@ impl Session { } /// Removes existing subscription. - fn remove_subscription(&self, name: &str, id: &SubscriptionId) { - self.active_subscriptions.lock().remove(&(id.clone(), name.into())); + fn remove_subscription(&self, name: &str, id: &SubscriptionId) -> bool { + self.active_subscriptions + .lock() + .remove(&(id.clone(), name.into())) + .is_some() } } @@ -335,8 +338,11 @@ where }; match (meta.session(), id) { (Some(session), Some(id)) => { - session.remove_subscription(&self.notification, &id); - Box::pin(self.unsubscribe.call(id, Some(meta))) + if session.remove_subscription(&self.notification, &id) { + Box::pin(self.unsubscribe.call(id, Some(meta))) + } else { + Box::pin(future::err(core::Error::invalid_params("Invalid subscription id."))) + } } (Some(_), None) => Box::pin(future::err(core::Error::invalid_params("Expected subscription id."))), _ => Box::pin(future::err(subscriptions_unavailable())), @@ -392,13 +398,36 @@ mod tests { }); // when - session.remove_subscription("test", &id); + let removed = session.remove_subscription("test", &id); drop(session); // then + assert_eq!(removed, true); assert_eq!(called.load(Ordering::SeqCst), false); } + #[test] + fn should_not_remove_subscription_if_invalid() { + // given + let id = SubscriptionId::Number(1); + let called = Arc::new(AtomicBool::new(false)); + let called2 = called.clone(); + let other_session = session().0; + let session = session().0; + session.add_subscription("test", &id, move |id| { + assert_eq!(id, SubscriptionId::Number(1)); + called2.store(true, Ordering::SeqCst); + }); + + // when + let removed = other_session.remove_subscription("test", &id); + drop(session); + + // then + assert_eq!(removed, false); + assert_eq!(called.load(Ordering::SeqCst), true); + } + #[test] fn should_unregister_in_case_of_collision() { // given @@ -485,40 +514,86 @@ mod tests { }); } - #[derive(Clone, Default)] - struct Metadata; + #[derive(Clone)] + struct Metadata(Arc); impl core::Metadata for Metadata {} impl PubSubMetadata for Metadata { fn session(&self) -> Option> { - Some(Arc::new(session().0)) + Some(self.0.clone()) + } + } + impl Default for Metadata { + fn default() -> Self { + Self(Arc::new(session().0)) } } #[test] fn should_subscribe() { // given - let called = Arc::new(AtomicBool::new(false)); - let called2 = called.clone(); let (subscribe, _) = new_subscription( "test".into(), - move |params, _meta, _subscriber| { + move |params, _meta, subscriber: Subscriber| { assert_eq!(params, core::Params::None); - called2.store(true, Ordering::SeqCst); + let _sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap(); }, |_id, _meta| async { Ok(core::Value::Bool(true)) }, ); - let meta = Metadata; // when + let meta = Metadata::default(); let result = subscribe.call(core::Params::None, meta); // then - assert_eq!(called.load(Ordering::SeqCst), true); + assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(5))); + } + + #[test] + fn should_unsubscribe() { + // given + const SUB_ID: u64 = 5; + let (subscribe, unsubscribe) = new_subscription( + "test".into(), + move |params, _meta, subscriber: Subscriber| { + assert_eq!(params, core::Params::None); + let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap(); + }, + |_id, _meta| async { Ok(core::Value::Bool(true)) }, + ); + + // when + let meta = Metadata::default(); + futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap(); + let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID)]), meta); + + // then + assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(true))); + } + + #[test] + fn should_not_unsubscribe_if_invalid() { + // given + const SUB_ID: u64 = 5; + let (subscribe, unsubscribe) = new_subscription( + "test".into(), + move |params, _meta, subscriber: Subscriber| { + assert_eq!(params, core::Params::None); + let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap(); + }, + |_id, _meta| async { Ok(core::Value::Bool(true)) }, + ); + + // when + let meta = Metadata::default(); + futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap(); + let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID + 1)]), meta); + + // then assert_eq!( futures::executor::block_on(result), Err(core::Error { - code: core::ErrorCode::ServerError(-32091), - message: "Subscription rejected".into(), + code: core::ErrorCode::InvalidParams, + message: "Invalid subscription id.".into(), data: None, }) );