Skip to content

Commit 709f7bc

Browse files
Identify reacquire in traces and metrics.
1 parent 5f7ea36 commit 709f7bc

File tree

4 files changed

+11
-9
lines changed

4 files changed

+11
-9
lines changed

lib/async/limiter/queued.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def expand(count, value = true)
5454
protected
5555

5656
# Acquire a resource from the queue with optional deadline.
57-
def acquire_resource(deadline, **options)
57+
def acquire_resource(deadline, reacquire: false, **options)
5858
@mutex.unlock
5959
return @queue.pop(timeout: deadline&.remaining, **options)
6060
ensure

lib/async/limiter/token.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def release
7373
def acquire(**options, &block)
7474
raise "Token already acquired!" if @resource
7575

76-
@resource = @limiter.acquire(**options)
76+
@resource = @limiter.acquire(reacquire: true, **options)
7777

7878
return @resource unless block_given?
7979

lib/metrics/provider/async/limiter/generic.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
def acquire_synchronized(timeout, cost, **options)
2020
# Build base tags and extend with instance tags if present
21-
tags = ["limiter_class:#{self.class.name}", "cost:#{cost}"]
21+
is_reacquire = options[:reacquire] || false
22+
tags = ["limiter:#{self.class.name}", "cost:#{cost}", "reacquire:#{is_reacquire}"]
2223
tags = Metrics::Tags.normalize(@tags, tags)
2324

2425
clock = Async::Clock.start
@@ -31,15 +32,15 @@ def acquire_synchronized(timeout, cost, **options)
3132
ACQUIRE_COUNTER.emit(1, tags: success_tags)
3233
ACQUIRE_DURATION.emit(clock.total, tags: success_tags)
3334
else
34-
# Emit failure metrics
35-
error_tags = Metrics::Tags.normalize(["result:error", "error:#{error.class.name}"], tags)
36-
ACQUIRE_COUNTER.emit(1, tags: error_tags)
37-
ACQUIRE_DURATION.emit(clock.total, tags: error_tags)
35+
# Emit failure metrics (timeout/contention)
36+
failure_tags = Metrics::Tags.normalize(["result:timeout"], tags)
37+
ACQUIRE_COUNTER.emit(1, tags: failure_tags)
38+
ACQUIRE_DURATION.emit(clock.total, tags: failure_tags)
3839
end
3940

4041
return result
4142
rescue => error
42-
# Emit failure metrics
43+
# Emit error metrics
4344
error_tags = Metrics::Tags.normalize(["result:error", "error:#{error.class.name}"], tags)
4445
ACQUIRE_COUNTER.emit(1, tags: error_tags)
4546
ACQUIRE_DURATION.emit(clock.total, tags: error_tags)
@@ -50,7 +51,7 @@ def acquire_synchronized(timeout, cost, **options)
5051

5152
def release(resource = true)
5253
# Build base tags and extend with instance tags if present
53-
tags = ["limiter_class:#{self.class.name}"]
54+
tags = ["limiter:#{self.class.name}"]
5455
tags = Metrics::Tags.normalize(@tags, tags)
5556

5657
begin

lib/traces/provider/async/limiter/generic.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def acquire_synchronized(timeout, cost, **options)
1616
"cost" => cost,
1717
"timeout" => timeout,
1818
"priority" => options[:priority],
19+
"reacquire" => options[:reacquire] || false,
1920
}
2021

2122
if @tags

0 commit comments

Comments
 (0)