Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Set mpsc::tracing_unbounded() queue size warning thresholds
  • Loading branch information
dmitry-markin committed Dec 20, 2022
commit cacf5b9e58f7bd5e3a29f558f596bb172d596f6e
4 changes: 3 additions & 1 deletion client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ impl<Block: BlockT> StorageNotifications<Block> {
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys });
let receiver = self
.0
.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);

StorageEventStream(receiver)
}
Expand Down
4 changes: 2 additions & 2 deletions client/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));

let stream = best_block_stream.subscribe();
let stream = best_block_stream.subscribe(100_000);
let closure_clone = beefy_best_block.clone();
let future = stream.for_each(move |best_beefy| {
let async_clone = closure_clone.clone();
Expand All @@ -141,7 +141,7 @@ where
fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self
.finality_proof_stream
.subscribe()
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block>(vfp));

let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ where
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
Expand Down
10 changes: 5 additions & 5 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ pub(crate) fn get_beefy_streams(
let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap();
let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } =
beefy_rpc_links;
best_block_streams.push(from_voter_best_beefy_stream.subscribe());
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe());
best_block_streams.push(from_voter_best_beefy_stream.subscribe(100_000));
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe(100_000));
});
(best_block_streams, versioned_finality_proof_streams)
}
Expand Down Expand Up @@ -729,7 +729,7 @@ async fn beefy_importing_blocks() {
let hashof1 = block.header.hash();

// Import without justifications.
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import
.import_block(params(block.clone(), None), HashMap::new())
Expand Down Expand Up @@ -772,7 +772,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof2 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down Expand Up @@ -816,7 +816,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof3 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down
8 changes: 4 additions & 4 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
spawner: &impl sp_core::traits::SpawnEssentialNamed,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (result_sender, result_port) = buffered_link::buffered_link(100_000);

let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
Expand Down Expand Up @@ -276,10 +276,10 @@ impl<B: BlockT> BlockImportWorker<B> {
use worker_messages::*;

let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification");
tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);

let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);

let mut worker = BlockImportWorker { result_sender, justification_import, metrics };

Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {

#[test]
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link();
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, mut finality_sender, mut block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
Expand Down
8 changes: 5 additions & 3 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ use super::BlockImportResult;
/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link.
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
pub fn buffered_link<B: BlockT>(
queue_size_warning: i64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need signed integer here?

Copy link
Contributor Author

@dmitry-markin dmitry-markin Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's explained in the comment for a struct field: to avoid underflow if, due to the lack of ordering, the counter happens to go < 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally: yes. But public API doesn't need to be signed integer. This should have been u32 instead that should still be plenty big for all intents and purposes. Same for tracing_unbounded function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering how much of a performance difference it actually makes using Relaxed ordering here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that relaxed ordering makes sense in terms of performance, it's about not having to bother about synchronization of increments/decrements, why signed integer is used. Relaxed ordering is just a consequence of this decision, because more strong guarantees are not needed if we use the unsigned integer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked into the issue, and as far as I understand it's impossible to guarantee that the counter is never decremented before it's incremented not relying on internals of mpsc::unbounded(). Basically, we have the following events:

Thread A Thread B
increment pull
push decrement

In order for decrement to never happen before increment, push in thread A must synchronize with pull in thread B. Note that this is not a synchronization between operations with our atomic counter, but a synchronization of mpsc::unbounded() operations we are not in control of. We can try setting the strongest sequentially consistent ordering guarantee for increment and decrement, but for this to work push and pull must also be sequentially consistent operations, what is unlikely and cannot be relied on.

Please correct me if I'm missing something.

CC @bkchr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use Acquire/Release it should work: https://en.cppreference.com/w/cpp/atomic/memory_order

The compiler should add some barrier that ensures that reads/writes are not reordered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @nazar-pc why aren't you just use a channel with a size of 0 and using try_send?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I didn't see try_send in there, also it is usually for different access patterns. I'd expect it to still produce a warning regardless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR implementing exact queue size warning (#13117), but I'd like it to be reviewed by somebody with good understanding of concurrency, atomics, and memory order of operations. If you know who to invite for review, please invite them.

) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
Expand Down Expand Up @@ -175,7 +177,7 @@ mod tests {

#[test]
fn is_closed() {
let (tx, rx) = super::buffered_link::<Block>();
let (tx, rx) = super::buffered_link::<Block>(1);
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
}

fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self.justification_stream.subscribe().map(
let stream = self.justification_stream.subscribe(100_000).map(
|x: sc_finality_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
},
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ impl<Block: BlockT> GossipValidator<Block> {
None => None,
};

let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator");
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
Expand Down
1 change: 1 addition & 0 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
"mpsc_grandpa_neighbor_packet_worker",
100_000,
);
let delay = Delay::new(rebroadcast_period);

Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl NetworkEventStream for TestNetwork {
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::pin(rx)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {

// needs to run in a tokio runtime.
pub(crate) fn make_test_network() -> (impl Future<Output = Tester>, TestNetwork) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let net = TestNetwork { sender: tx };

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ where
}
})?;

let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");
let (voter_commands_tx, voter_commands_rx) =
tracing_unbounded("mpsc_grandpa_voter_command", 100_000);

let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();

Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ mod tests {
aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
.unwrap();

let (_tx, voter_command_rx) = tracing_unbounded("");
let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);

let observer = ObserverWork::new(
client,
Expand Down
8 changes: 4 additions & 4 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {

impl TestChainState {
fn new() -> (Self, ImportNotifications<Block>) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let state =
TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {
// enact all dependencies before importing the message
enact_dependencies(&chain_state);

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -708,7 +708,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -896,7 +896,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let block_sync_requester = TestBlockSyncRequester::default();

Expand Down
2 changes: 1 addition & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ where
&params.network_config.transport,
)?;

let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker");
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);

if let Some(path) = &params.network_config.net_config_path {
fs::create_dir_all(path)?;
Expand Down
2 changes: 1 addition & 1 deletion client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ where
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
) -> Result<(Self, ChainSyncInterfaceHandle<B>, NonDefaultSetConfig), ClientError> {
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync");
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
let block_announce_config = Self::get_block_announce_proto_config(
protocol_id,
fork_id,
Expand Down
2 changes: 1 addition & 1 deletion client/network/sync/src/service/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl NetworkServiceHandle {
impl NetworkServiceProvider {
/// Create new `NetworkServiceProvider`
pub fn new() -> (Self, NetworkServiceHandle) {
let (tx, rx) = tracing_unbounded("mpsc_network_service_provider");
let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);

(Self { rx }, NetworkServiceHandle::new(tx))
}
Expand Down
4 changes: 2 additions & 2 deletions client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ impl SharedClient {

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000);
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000);

let api = HttpApi {
to_worker,
Expand Down
2 changes: 1 addition & 1 deletion client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub struct Peerset {
impl Peerset {
/// Builds a new peerset from the given configuration.
pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages");
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000);

let handle = PeersetHandle { tx: tx.clone() };

Expand Down
2 changes: 1 addition & 1 deletion client/rpc/src/system/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Default for Status {
fn api<T: Into<Option<Status>>>(sync: T) -> RpcModule<System<Block>> {
let status = sync.into().unwrap_or_default();
let should_have_peers = !status.is_dev;
let (tx, rx) = tracing_unbounded("rpc_system_tests");
let (tx, rx) = tracing_unbounded("rpc_system_tests", 10_000);
thread::spawn(move || {
futures::executor::block_on(rx.for_each(move |request| {
match request {
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ where
);
spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service)));

let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);

let future = build_network_future(
config.role.clone(),
Expand Down
4 changes: 2 additions & 2 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1909,13 +1909,13 @@ where
{
/// Get block import event stream.
fn import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream");
let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream", 100_000);
self.import_notification_sinks.lock().push(sink);
stream
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream");
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
self.finality_notification_sinks.lock().push(sink);
stream
}
Expand Down
3 changes: 2 additions & 1 deletion client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ impl TaskManager {
let (signal, on_exit) = exit_future::signal();

// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks");
let (essential_failed_tx, essential_failed_rx) =
tracing_unbounded("mpsc_essential_tasks", 100);

let metrics = prometheus_registry.map(Metrics::register).transpose()?;

Expand Down
2 changes: 1 addition & 1 deletion client/transaction-pool/src/graph/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<H, BH> Default for Sender<H, BH> {
impl<H: Clone, BH: Clone> Sender<H, BH> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher");
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
self.receivers.push(tx);
Watcher { receiver, hash }
}
Expand Down
2 changes: 1 addition & 1 deletion client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ where
pool: Arc<Pool<Api>>,
interval: Duration,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000);

let worker = RevalidationWorker::new(api.clone(), pool.clone());

Expand Down
4 changes: 2 additions & 2 deletions client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ mod inner {
/// above the warning threshold.
pub fn tracing_unbounded<T>(
name: &'static str,
//queue_size_warning: i64,
queue_size_warning: i64,
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
let (s, r) = mpsc::unbounded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you (or anyone else) know the reason for using unbounded channels here? afaik they are never a good idea in production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a long going story about implementing back-pressure mechanisms in substrate (using bounded channels), but nobody knows how to implement it correctly so far. So at least we added the warning to detect if some of the channels are not being polled and leak messages (and memory).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but nobody knows how to implement it correctly so far

This is clearly not the problem :P The code base has grown and organically. Not all was build from the beginning with async being considered. Unbounded channels give you the opportunity to combine async/sync code in some easy way, but yeah there is no back pressure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks 🙏

let queue_size = Arc::new(AtomicI64::new(0));
let sender = TracingUnboundedSender {
inner: s,
name,
queue_size: queue_size.clone(),
queue_size_warning: 1,
queue_size_warning,
warning_fired: Arc::new(AtomicBool::new(false)),
creation_backtrace: Arc::new(Backtrace::capture()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to resolve and we should not require RUST_BACKTRACE to be set. If we then want to print the backtrace, we should call resolve().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #13020.

};
Expand Down
4 changes: 2 additions & 2 deletions client/utils/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ impl<Payload, TK: TracingKeyStr> NotificationStream<Payload, TK> {
}

/// Subscribe to a channel through which the generic payload can be received.
pub fn subscribe(&self) -> NotificationReceiver<Payload> {
let receiver = self.hub.subscribe(());
pub fn subscribe(&self, queue_size_warning: i64) -> NotificationReceiver<Payload> {
let receiver = self.hub.subscribe((), queue_size_warning);
NotificationReceiver { receiver }
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/utils/src/notification/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn notification_channel_simple() {

// Create a future to receive a single notification
// from the stream and verify its payload.
let future = stream.subscribe().take(1).for_each(move |payload| {
let future = stream.subscribe(100_000).take(1).for_each(move |payload| {
let test_payload = closure_payload.clone();
async move {
assert_eq!(payload, test_payload);
Expand Down
4 changes: 2 additions & 2 deletions client/utils/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<M, R> Hub<M, R> {
/// Subscribe to this Hub using the `subs_key: K`.
///
/// A subscription with a key `K` is possible if the Registry implements `Subscribe<K>`.
pub fn subscribe<K>(&self, subs_key: K) -> Receiver<M, R>
pub fn subscribe<K>(&self, subs_key: K, queue_size_warning: i64) -> Receiver<M, R>
where
R: Subscribe<K> + Unsubscribe,
{
Expand All @@ -178,7 +178,7 @@ impl<M, R> Hub<M, R> {
// have the sink disposed.
shared_borrowed.registry.subscribe(subs_key, subs_id);

let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key);
let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning);
assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique.");

Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx }
Expand Down
Loading