Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/send_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import struct

import click
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_producer_configuration
from arroyo.types import Topic

from sentry.sentry_metrics.use_case_id_registry import UseCaseID
Expand Down Expand Up @@ -172,7 +172,7 @@ def make_csql(rand_str, is_generic):
def produce_msgs(messages, is_generic, host, dryrun, quiet):
conf = {"bootstrap.servers": host}

producer = KafkaProducer(conf)
producer = KafkaProducer(build_kafka_producer_configuration(default_config=conf))
for i, message in enumerate(messages):
print(f"{i + 1} / {len(messages)}")
if not quiet:
Expand Down
7 changes: 5 additions & 2 deletions src/sentry/consumers/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum

from arroyo.backends.abstract import ProducerFuture
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_producer_configuration
from arroyo.dlq import InvalidMessage, KafkaDlqProducer
from arroyo.processing.strategies.abstract import (
MessageRejected,
Expand Down Expand Up @@ -68,7 +68,10 @@ def _get_dlq_producer(topic: Topic | None) -> KafkaDlqProducer | None:
config = get_kafka_producer_cluster_options(topic_defn["cluster"])
config["client.id"] = f"sentry.consumers.dlq.{topic.value}"
real_topic = topic_defn["real_topic_name"]
return KafkaDlqProducer(KafkaProducer(config), ArroyoTopic(real_topic))
return KafkaDlqProducer(
KafkaProducer(build_kafka_producer_configuration(default_config=config)),
ArroyoTopic(real_topic),
)


def maybe_build_dlq_producer(
Expand Down
6 changes: 5 additions & 1 deletion src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any

from arroyo.backends.kafka import build_kafka_producer_configuration
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
Expand Down Expand Up @@ -42,7 +43,10 @@ def get_producer(self, topic: Topic) -> Producer:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
cluster_options["client.id"] = "sentry.eventstream.kafka"
self.__producers[topic] = Producer(cluster_options)
# XXX(markus): We should use `sentry.utils.arroyo_producer.get_arroyo_producer`.
self.__producers[topic] = Producer(
build_kafka_producer_configuration(default_config=cluster_options)
)

return self.__producers[topic]

Expand Down
6 changes: 5 additions & 1 deletion src/sentry/utils/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import Any

from arroyo.backends.kafka import build_kafka_producer_configuration
from confluent_kafka import Producer


class KafkaPublisher:
# XXX(markus): Deprecated. Please use `sentry.utils.arroyo_producer.get_arroyo_producer`.
def __init__(self, connection: dict[str, Any], asynchronous: bool = True) -> None:
self.producer = Producer(connection or {})
self.producer = Producer(
build_kafka_producer_configuration(default_config=connection or {})
)
self.asynchronous = asynchronous

def publish(self, channel: str, value: str, key: str | None = None) -> None:
Expand Down
Loading