-
Notifications
You must be signed in to change notification settings - Fork 103
feat: Add support for Kafka clusters sharding #1454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bf7ecda
771de1b
1e6ee4b
baf73cd
7e49aa8
3934c69
9809809
a1775a7
0589017
c1fb97e
191c768
a0d0987
f967933
d4a05c2
f0e792c
0db194e
6f499da
1c3d5e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,6 +141,9 @@ pub enum ConfigErrorKind { | |
/// The user referenced a kafka config name that does not exist. | ||
#[fail(display = "unknown kafka config name")] | ||
UnknownKafkaConfigName, | ||
/// The user did not configure 0 shard | ||
#[fail(display = "invalid kafka shard configuration: must have shard with index 0")] | ||
InvalidKafkaShard, | ||
} | ||
|
||
enum ConfigFormat { | ||
|
@@ -882,42 +885,136 @@ pub enum TopicAssignment { | |
/// in `kafka_config` will be used. | ||
Primary(String), | ||
/// Object containing topic name and string identifier of one of the clusters configured in | ||
/// `secondary_kafka_configs`. In this case that custom kafkaconfig will be used to produce | ||
/// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce | ||
/// data to the given topic name. | ||
Secondary { | ||
/// The topic name to use. | ||
#[serde(rename = "name")] | ||
topic_name: String, | ||
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`. | ||
#[serde(rename = "config")] | ||
kafka_config_name: String, | ||
Secondary(KafkaTopicConfig), | ||
/// If we want to configure multiple kafka clusters, we can create a mapping of the | ||
/// range of logical shards to the kafka configuration. | ||
Sharded(Sharded), | ||
} | ||
|
||
/// Configuration for topic | ||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct KafkaTopicConfig { | ||
/// The topic name to use. | ||
#[serde(rename = "name")] | ||
topic_name: String, | ||
/// The Kafka config name will be used to produce data to the given topic. | ||
#[serde(rename = "config")] | ||
kafka_config_name: String, | ||
} | ||
|
||
/// Configuration for logical shards -> kafka configuration mapping. | ||
/// The configuration for this should look like: | ||
/// ``` | ||
/// metrics: | ||
/// shards: 65000 | ||
/// mapping: | ||
/// 0: | ||
/// name: "ingest-metrics-1" | ||
/// config: "metrics_1" | ||
/// 25000: | ||
/// name: "ingest-metrics-2" | ||
/// config: "metrics_2" | ||
/// 45000: | ||
/// name: "ingest-metrics-3" | ||
/// config: "metrics_3" | ||
/// ``` | ||
/// | ||
/// where the `shards` defines how many logical shards must be created, and `mapping` | ||
/// describes the per-shard configuration. Index in the `mapping` is the initial inclusive | ||
/// index of the shard and the range is last till the next index or the maximum shard defined in | ||
/// the `shards` option. The first index must always start with 0. | ||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct Sharded { | ||
/// The number of shards used for this topic. | ||
shards: u64, | ||
/// The Kafka configuration assigned to the specific shard range. | ||
mapping: BTreeMap<u64, KafkaTopicConfig>, | ||
} | ||
|
||
/// Describes Kafka config, with all the parameters extracted, which will be used for creating the | ||
/// kafka producer. | ||
#[derive(Debug)] | ||
pub enum KafkaConfig<'a> { | ||
/// Single config with Kafka parameters. | ||
Single(KafkaParams<'a>), | ||
|
||
/// The list of the Kafka configs with related shard configs. | ||
Sharded { | ||
/// The maximum number of logical shards for this set of configs. | ||
shards: u64, | ||
/// The list of the sharded Kafka configs. | ||
configs: BTreeMap<u64, KafkaParams<'a>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have this mapping in three different places now, I wonder if it would be possible to have it only once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think it makes sense this way. |
||
}, | ||
} | ||
|
||
/// Sharded Kafka config | ||
#[derive(Debug)] | ||
pub struct KafkaParams<'a> { | ||
/// The topic name to use. | ||
pub topic_name: &'a str, | ||
/// The Kafka config name will be used to produce data. | ||
pub config_name: Option<&'a str>, | ||
/// Parameters for the Kafka producer configuration. | ||
pub params: &'a [KafkaConfigParam], | ||
} | ||
|
||
impl From<String> for TopicAssignment { | ||
fn from(topic_name: String) -> TopicAssignment { | ||
TopicAssignment::Primary(topic_name) | ||
fn from(topic_name: String) -> Self { | ||
Self::Primary(topic_name) | ||
} | ||
} | ||
|
||
impl TopicAssignment { | ||
/// Get the topic name from this topic assignment. | ||
fn topic_name(&self) -> &str { | ||
match *self { | ||
TopicAssignment::Primary(ref s) => s.as_str(), | ||
TopicAssignment::Secondary { ref topic_name, .. } => topic_name.as_str(), | ||
} | ||
} | ||
/// Get the kafka config for the current topic assignment. | ||
fn kafka_config<'a>(&'a self, config: &'a Config) -> Result<KafkaConfig<'_>, ConfigErrorKind> { | ||
let kafka_config = match self { | ||
Self::Primary(topic_name) => KafkaConfig::Single(KafkaParams { | ||
topic_name, | ||
config_name: None, | ||
params: config.values.processing.kafka_config.as_slice(), | ||
}), | ||
Self::Secondary(KafkaTopicConfig { | ||
topic_name, | ||
kafka_config_name, | ||
}) => KafkaConfig::Single(KafkaParams { | ||
config_name: Some(kafka_config_name), | ||
topic_name, | ||
params: config | ||
.values | ||
.processing | ||
.secondary_kafka_configs | ||
.get(kafka_config_name) | ||
.ok_or(ConfigErrorKind::UnknownKafkaConfigName)?, | ||
}), | ||
Self::Sharded(Sharded { shards, mapping }) => { | ||
// quick fail if the config does not contain shard 0 | ||
if !mapping.contains_key(&0) { | ||
return Err(ConfigErrorKind::InvalidKafkaShard); | ||
} | ||
let mut kafka_params = BTreeMap::new(); | ||
for (shard, kafka_config) in mapping { | ||
let config = KafkaParams { | ||
topic_name: kafka_config.topic_name.as_str(), | ||
config_name: Some(kafka_config.kafka_config_name.as_str()), | ||
params: config | ||
.values | ||
.processing | ||
.secondary_kafka_configs | ||
.get(kafka_config.kafka_config_name.as_str()) | ||
.ok_or(ConfigErrorKind::UnknownKafkaConfigName)?, | ||
}; | ||
kafka_params.insert(*shard, config); | ||
} | ||
KafkaConfig::Sharded { | ||
shards: *shards, | ||
configs: kafka_params, | ||
} | ||
} | ||
}; | ||
|
||
/// Get the name of the kafka config to use. `None` means default configuration. | ||
fn kafka_config_name(&self) -> Option<&str> { | ||
match *self { | ||
TopicAssignment::Primary(_) => None, | ||
TopicAssignment::Secondary { | ||
ref kafka_config_name, | ||
.. | ||
} => Some(kafka_config_name.as_str()), | ||
} | ||
Ok(kafka_config) | ||
} | ||
} | ||
|
||
|
@@ -1952,29 +2049,9 @@ impl Config { | |
self.values.processing.max_session_secs_in_past.into() | ||
} | ||
|
||
/// Topic name and list of Kafka configuration parameters for a given topic. | ||
pub fn kafka_topic_name(&self, topic: KafkaTopic) -> &str { | ||
self.values.processing.topics.get(topic).topic_name() | ||
} | ||
|
||
/// Configuration name and list of Kafka configuration parameters for a given topic. | ||
pub fn kafka_config( | ||
&self, | ||
topic: KafkaTopic, | ||
) -> Result<(Option<&str>, &[KafkaConfigParam]), ConfigErrorKind> { | ||
if let Some(config_name) = self.values.processing.topics.get(topic).kafka_config_name() { | ||
Ok(( | ||
Some(config_name), | ||
self.values | ||
.processing | ||
.secondary_kafka_configs | ||
.get(config_name) | ||
.ok_or(ConfigErrorKind::UnknownKafkaConfigName)? | ||
.as_slice(), | ||
)) | ||
} else { | ||
Ok((None, self.values.processing.kafka_config.as_slice())) | ||
} | ||
pub fn kafka_config(&self, topic: KafkaTopic) -> Result<KafkaConfig, ConfigErrorKind> { | ||
self.values.processing.topics.get(topic).kafka_config(self) | ||
} | ||
|
||
/// Redis servers to connect to, for rate limiting. | ||
|
@@ -2068,4 +2145,66 @@ cache: | |
Err(_) | ||
)); | ||
} | ||
|
||
/// Make sure we can parse the processing config properlu | ||
#[test] | ||
fn test_kafka_config_parsing() { | ||
let yaml = r###" | ||
processing: | ||
enabled: true | ||
kafka_config: | ||
- {name: "bootstrap.servers", value: "localhost:1111"} | ||
secondary_kafka_configs: | ||
metrics_1: | ||
- {name: "bootstrap.servers", value: "localhost:1111"} | ||
metrics_2: | ||
- {name: "bootstrap.servers", value: "localhost:2222"} | ||
metrics_3: | ||
- {name: "bootstrap.servers", value: "localhost:3333"} | ||
profiles: | ||
- {name: "bootstrap.servers", value: "localhost:3333"} | ||
topics: | ||
events: "ingest-events" | ||
profiles: | ||
name: "ingest-profiles" | ||
config: "profiles" | ||
metrics: | ||
shards: 65000 | ||
mapping: | ||
0: | ||
name: "ingest-metrics-1" | ||
config: "metrics_1" | ||
25000: | ||
name: "ingest-metrics-2" | ||
config: "metrics_2" | ||
45000: | ||
name: "ingest-metrics-3" | ||
config: "metrics_3" | ||
"###; | ||
|
||
let values: ConfigValues = serde_yaml::from_str(yaml).unwrap(); | ||
let c = Config { | ||
values, | ||
..Default::default() | ||
}; | ||
let metrics_def = &c.values.processing.topics.metrics; | ||
assert!(matches!(metrics_def, TopicAssignment::Sharded { .. })); | ||
|
||
let (shards, mapping) = | ||
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics_def { | ||
(shards, mapping) | ||
} else { | ||
unreachable!() | ||
}; | ||
assert_eq!(shards, &65000); | ||
assert_eq!(3, mapping.len()); | ||
|
||
let events = &c.values.processing.topics.events; | ||
assert!(matches!(events, TopicAssignment::Primary(_))); | ||
|
||
let events_config = events | ||
.kafka_config(&c) | ||
.expect("Kafka config for events topic"); | ||
assert!(matches!(events_config, KafkaConfig::Single(_))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you describe that the u64 is the start index and the next u64 describes the range. Explicitly calling out what's inclusive (i'm assuming the start u64 is inclusive, the end u64 is excluded and part of the next range). Maybe even write an example out on the struct doc comment like you did in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@flub, please, have a look into f967933 if this is something you had in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, 💯