diff --git a/Cargo.lock b/Cargo.lock index 12fa4aaafa52e..78218d80c88d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8447,7 +8447,6 @@ dependencies = [ "nom", "notify", "num-format", - "num_cpus", "number_prefix", "once_cell", "openssl", diff --git a/Cargo.toml b/Cargo.toml index 5cac267fa02c7..65bee9c31c5b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -266,7 +266,6 @@ nats = { version = "0.20.1", default-features = false, optional = true } nkeys = { version = "0.2.0", default-features = false, optional = true } nom = { version = "7.1.1", default-features = false, optional = true } notify = { version = "4.0.17", default-features = false } -num_cpus = { version = "1.13.1", default-features = false } once_cell = { version = "1.12", default-features = false } openssl = { version = "0.10.40", default-features = false, features = ["vendored"] } openssl-probe = { version = "0.1.5", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index a0d8425c47051..f7a486566eec3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,3 +158,9 @@ where #[cfg(not(tokio_unstable))] tokio::spawn(task) } + +pub fn num_threads() -> usize { + std::thread::available_parallelism() + .expect("Could not determine available parallelism") + .into() +} diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 609bd53725253..51ed68435bedf 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -110,7 +110,7 @@ const fn default_true() -> bool { } fn default_client_concurrency() -> u32 { - cmp::max(1, num_cpus::get() as u32) + cmp::max(1, crate::num_threads() as u32) } #[derive(Debug, Snafu)] diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 9f8b7ffb793b1..0bf937d39eec6 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -143,7 +143,7 @@ const fn default_poll_secs() -> u32 { } fn default_client_concurrency() -> u32 { - cmp::max(1, num_cpus::get() as u32) + cmp::max(1, crate::num_threads() as u32) } const fn default_visibility_timeout_secs() -> u32 { diff --git a/src/sources/util/tcp/mod.rs b/src/sources/util/tcp/mod.rs index 3fbafec0fe7e5..7f152789cd612 100644 --- a/src/sources/util/tcp/mod.rs +++ b/src/sources/util/tcp/mod.rs @@ -155,7 +155,8 @@ where let connection_gauge = OpenGauge::new(); let shutdown_clone = cx.shutdown.clone(); - let request_limiter = RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, num_cpus::get()); + let request_limiter = + RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads()); listener .accept_stream_limited(max_connections) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 29b6d9b2b3e96..a29283675e8ce 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -61,7 +61,7 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { crate::app::WORKER_THREADS .get() .map(std::num::NonZeroUsize::get) - .unwrap_or_else(num_cpus::get) + .unwrap_or_else(crate::num_threads) }); pub(self) async fn load_enrichment_tables<'a>( diff --git a/tests/api.rs b/tests/api.rs index 9ed77dc5588e0..5d401346b5199 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -36,7 +36,7 @@ fn fork_test>(test_name: &'static str, fut: // Since we are spawning the runtime from within a forked process, use one worker less // to account for the additional process. // This adjustment mainly serves to not overload CI workers with low resources. - let rt = runtime_constrained(std::cmp::max(1, num_cpus::get() - 1)); + let rt = runtime_constrained(std::cmp::max(1, crate::num_threads() - 1)); rt.block_on(fut); }, )