Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ def get_redis_connection(self, key: str, transaction: bool = True) -> Pipeline:
pipe = conn.pipeline(transaction=transaction)
return pipe

def _execute_redis_operation(
def _execute_redis_operation_no_txn(
self, key: str, operation: RedisOperation, *args: Any, **kwargs: Any
) -> Any:
metrics_str = f"redis_buffer.{operation.value}"
metrics.incr(metrics_str)
pipe = self.get_redis_connection(self.pending_key)
pipe = self.get_redis_connection(self.pending_key, transaction=False)
getattr(pipe, operation.value)(key, *args, **kwargs)
if args:
pipe.expire(key, self.key_expire)
Expand All @@ -356,7 +356,7 @@ def _execute_sharded_redis_operation(

metrics_str = f"redis_buffer.{operation.value}"
metrics.incr(metrics_str, amount=len(keys))
pipe = self.get_redis_connection(self.pending_key)
pipe = self.get_redis_connection(self.pending_key, transaction=False)
for key in keys:
getattr(pipe, operation.value)(key, *args, **kwargs)
if args:
Expand All @@ -369,10 +369,10 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
value_dict = {v: now for v in value}
else:
value_dict = {value: now}
self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, value_dict)
self._execute_redis_operation_no_txn(key, RedisOperation.SORTED_SET_ADD, value_dict)

def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, float]]:
redis_set = self._execute_redis_operation(
redis_set = self._execute_redis_operation_no_txn(
key,
RedisOperation.SORTED_SET_GET_RANGE,
min=min,
Expand Down Expand Up @@ -410,7 +410,9 @@ def bulk_get_sorted_set(
return data_to_timestamps

def delete_key(self, key: str, min: float, max: float) -> None:
self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max)
self._execute_redis_operation_no_txn(
key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max
)

def delete_keys(self, keys: list[str], min: float, max: float) -> None:
self._execute_sharded_redis_operation(
Expand All @@ -427,7 +429,7 @@ def delete_hash(
fields: list[str],
) -> None:
key = self._make_key(model, filters)
pipe = self.get_redis_connection(self.pending_key)
pipe = self.get_redis_connection(self.pending_key, transaction=False)
for field in fields:
getattr(pipe, RedisOperation.HASH_DELETE.value)(key, field)
pipe.expire(key, self.key_expire)
Expand All @@ -441,7 +443,7 @@ def push_to_hash(
value: str,
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)
self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD, field, value)

def push_to_hash_bulk(
self,
Expand All @@ -450,11 +452,11 @@ def push_to_hash_bulk(
data: dict[str, str],
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data)
self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD_BULK, data)

def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]:
key = self._make_key(model, field)
redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL)
redis_hash = self._execute_redis_operation_no_txn(key, RedisOperation.HASH_GET_ALL)
decoded_hash = {}
for k, v in redis_hash.items():
if isinstance(k, bytes):
Expand All @@ -467,7 +469,7 @@ def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) ->

def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int:
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)
return self._execute_redis_operation_no_txn(key, RedisOperation.HASH_LENGTH)

def incr(
self,
Expand Down
Loading