Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: surface circuit-breaker state and per-domain cooldown counts as…
… Prometheus metrics

Two metrics that were previously only observable via /ready introspection or
direct Redis access are now first-class gauges:

  path_circuit_breaker_state{service_id, domain}
    1 = domain currently locked out, 0 = healthy/recovered.
    Set on MarkBroken; dropped to 0 on ClearService and on TTL-expiry refresh.
    Both refreshLocal and refreshFromRedis now drop the gauge for expired
    entries; refreshFromRedis additionally re-asserts gauge=1 for currently-
    broken domains so a fresh pod that lazily picks up Redis state stays
    consistent without going through MarkBroken locally.

  path_endpoints_in_cooldown{domain, rpc_type, service_id}
    Per-domain count of endpoints currently in strike cooldown
    (Score.IsInCooldown() == true). Cooldown is a transient state imposed
    by accumulated critical strikes — independent from "score below
    threshold" which is already covered by path_reputation_endpoint_leaderboard
    with tier_threshold="0".

  Published every 10s via the leaderboard publisher; Reset between snapshots
  so a domain dropping to zero cooldown'd endpoints actually shows zero
  instead of sticking at its last value via Prometheus' staleness window.

New LeaderboardDataProvider method GetCooldownCountData implemented on
Shannon's Protocol.

Test coverage in gateway/domain_circuit_breaker_test.go:
  - MarkBroken → gauge=1
  - ClearService → gauge=0
  - TTL expiry + refresh → gauge=0

Closes the metric gap operators have been hitting when asking "which
domains are circuit-broken right now?" — answer used to require Redis
access; now it's a Prometheus query.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
  • Loading branch information
oten91 and claude committed May 1, 2026
commit bfe247752a0fc6b2d0e7b46c15f5ce1d53670f95
49 changes: 46 additions & 3 deletions gateway/domain_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/redis/go-redis/v9"

"github.com/pokt-network/path/metrics"
"github.com/pokt-network/path/protocol"
)

Expand Down Expand Up @@ -112,6 +113,10 @@ func (cb *DomainCircuitBreaker) MarkBroken(ctx context.Context, serviceID, domai
entry.domains[domain] = brokenDomainState{expiry: expiry, hitCount: hitCount, reason: reason}
cb.mu.Unlock()

// Surface state via Prometheus gauge so dashboards can show "currently broken
// domains" without needing Redis access. Idempotent — re-setting to 1 is fine.
metrics.SetCircuitBreakerState(serviceID, domain, true)

// Always log circuit break events at error level for production visibility.
// This is critical for diagnosing why domains get locked out.
cb.logger.Error().
Expand Down Expand Up @@ -172,17 +177,20 @@ func filterExpiredDomains(domains map[string]brokenDomainState) map[string]bool
// Used when Redis is not available (local-only mode).
func (cb *DomainCircuitBreaker) refreshLocal(serviceID string) map[string]bool {
cb.mu.Lock()
defer cb.mu.Unlock()

entry, ok := cb.cache[serviceID]
if !ok {
cb.mu.Unlock()
return nil
}

// Remove expired entries
// Remove expired entries; collect their names so we can drop the gauge to 0
// outside the lock (avoid taking metrics lock under cb.mu).
now := time.Now()
var expired []string
for domain, state := range entry.domains {
if !state.expiry.After(now) {
expired = append(expired, domain)
delete(entry.domains, domain)
}
}
Expand All @@ -192,6 +200,11 @@ func (cb *DomainCircuitBreaker) refreshLocal(serviceID string) map[string]bool {
for d := range entry.domains {
result[d] = true
}
cb.mu.Unlock()

for _, d := range expired {
metrics.SetCircuitBreakerState(serviceID, d, false)
}
return result
}

Expand Down Expand Up @@ -264,15 +277,20 @@ func (cb *DomainCircuitBreaker) refreshFromRedis(ctx context.Context, serviceID
cb.redisClient.HDel(ctx, key, expiredFields...)
}

// Merge with local cache (local entries might be newer than Redis)
// Merge with local cache (local entries might be newer than Redis).
// Capture pre-merge cache contents so we can drop the gauge to 0 for any
// domain that was previously broken but is no longer present after merge.
cb.mu.Lock()
var previouslyBroken []string
existingEntry, ok := cb.cache[serviceID]
if ok {
for domain, state := range existingEntry.domains {
if state.expiry.After(now) {
if redisState, exists := domains[domain]; !exists || state.expiry.After(redisState.expiry) {
domains[domain] = state
}
} else {
previouslyBroken = append(previouslyBroken, domain)
}
}
}
Expand All @@ -282,6 +300,21 @@ func (cb *DomainCircuitBreaker) refreshFromRedis(ctx context.Context, serviceID
}
cb.mu.Unlock()

// Drop gauge for previously-broken-but-now-expired domains, plus the explicit
// expiredFields list we already collected from Redis. Done outside the lock.
for _, d := range expiredFields {
metrics.SetCircuitBreakerState(serviceID, d, false)
}
for _, d := range previouslyBroken {
metrics.SetCircuitBreakerState(serviceID, d, false)
}
// Re-assert gauge=1 for currently-broken domains. Idempotent and ensures the
// metric is correct on a fresh pod that lazily picks up Redis state without
// going through MarkBroken locally.
for d := range domains {
metrics.SetCircuitBreakerState(serviceID, d, true)
}

// Build result
result := make(map[string]bool, len(domains))
for d := range domains {
Expand All @@ -297,8 +330,13 @@ func (cb *DomainCircuitBreaker) ClearService(ctx context.Context, serviceID stri
cb.mu.Lock()
entry, ok := cb.cache[serviceID]
count := 0
var clearedDomains []string
if ok {
count = len(entry.domains)
clearedDomains = make([]string, 0, count)
for d := range entry.domains {
clearedDomains = append(clearedDomains, d)
}
delete(cb.cache, serviceID)
}
cb.mu.Unlock()
Expand All @@ -307,6 +345,11 @@ func (cb *DomainCircuitBreaker) ClearService(ctx context.Context, serviceID stri
cb.redisClient.Del(ctx, cb.keyPrefix+serviceID)
}

// Drop the gauge for every domain we cleared so the dashboard reflects reality.
for _, d := range clearedDomains {
metrics.SetCircuitBreakerState(serviceID, d, false)
}

cb.logger.Info().
Str("service_id", serviceID).
Int("cleared_domains", count).
Expand Down
58 changes: 58 additions & 0 deletions gateway/domain_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ import (

"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/polylog/polyzero"
"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/pokt-network/path/metrics"
"github.com/pokt-network/path/protocol"
)

func testCircuitBreakerLogger() polylog.Logger {
return polyzero.NewLogger(polyzero.WithOutput(os.Stderr))
}

// readCircuitBreakerGauge returns the current value of the per-(serviceID, domain)
// circuit-breaker state gauge. Used by metric-transition tests.
func readCircuitBreakerGauge(serviceID, domain string) float64 {
return testutil.ToFloat64(metrics.DomainCircuitBreakerState.WithLabelValues(serviceID, domain))
}

func TestDomainCircuitBreaker_MarkAndGet(t *testing.T) {
cb := NewDomainCircuitBreaker(nil, testCircuitBreakerLogger())
ctx := context.Background()
Expand Down Expand Up @@ -427,3 +435,53 @@ func TestParseRedisValue_InvalidFormat(t *testing.T) {
t.Fatal("expected parse to fail for empty string")
}
}

// TestDomainCircuitBreaker_MetricGaugeTransitions verifies that the circuit-breaker
// state gauge moves between 0 and 1 correctly: set to 1 on MarkBroken, dropped to 0
// on ClearService, and dropped to 0 by refreshLocal once a TTL expires. We test
// directly against the Prometheus client metric rather than scraping /metrics so the
// test stays hermetic.
func TestDomainCircuitBreaker_MetricGaugeTransitions(t *testing.T) {
cb := NewDomainCircuitBreaker(nil, testCircuitBreakerLogger())
cb.defaultTTL = 50 * time.Millisecond
ctx := context.Background()
const serviceID = "metric-test-svc"
const domain = "metric-test-domain.example.com"

// Initial state: gauge should be 0 (or absent — the Set on first MarkBroken creates it).
if v := readCircuitBreakerGauge(serviceID, domain); v != 0 {
t.Fatalf("expected gauge=0 initially, got %v", v)
}

// MarkBroken → gauge should flip to 1
cb.MarkBroken(ctx, serviceID, domain, "test_reason")
if v := readCircuitBreakerGauge(serviceID, domain); v != 1 {
t.Fatalf("expected gauge=1 after MarkBroken, got %v", v)
}

// ClearService → gauge drops to 0 for cleared domains
cb.ClearService(ctx, serviceID)
if v := readCircuitBreakerGauge(serviceID, domain); v != 0 {
t.Fatalf("expected gauge=0 after ClearService, got %v", v)
}

// MarkBroken again, then wait for TTL expiry, then refresh — gauge should drop to 0
cb.MarkBroken(ctx, serviceID, domain, "test_reason_2")
if v := readCircuitBreakerGauge(serviceID, domain); v != 1 {
t.Fatalf("expected gauge=1 after second MarkBroken, got %v", v)
}

time.Sleep(60 * time.Millisecond)
// Force the cache entry to look stale so GetBrokenDomains takes the refresh path.
// In production, the same effect happens automatically once cacheTTL elapses
// (default 5s) after the entry was last refreshed.
cb.mu.Lock()
if entry, ok := cb.cache[serviceID]; ok {
entry.refreshAt = time.Now().Add(-time.Second)
}
cb.mu.Unlock()
cb.GetBrokenDomains(ctx, serviceID)
if v := readCircuitBreakerGauge(serviceID, domain); v != 0 {
t.Fatalf("expected gauge=0 after TTL expiry + refresh, got %v", v)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/linxGnu/grocksdb v1.8.14 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
Expand Down
31 changes: 31 additions & 0 deletions metrics/leaderboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type SupplierScoreEntry struct {
Score float64
}

// CooldownCountEntry represents per-domain count of endpoints currently in strike
// cooldown for a given service / rpc_type.
type CooldownCountEntry struct {
Domain string
RPCType string
ServiceID string
Count int
}

// LeaderboardDataProvider is an interface for getting endpoint distribution data
type LeaderboardDataProvider interface {
// GetEndpointLeaderboardData returns all endpoint entries grouped by the required dimensions
Expand All @@ -49,6 +58,10 @@ type LeaderboardDataProvider interface {
// GetSupplierScoreData returns per-(supplier, service_id) reputation scores.
// Optional: implementations may return nil if per-supplier scoring is not supported.
GetSupplierScoreData(ctx context.Context) ([]SupplierScoreEntry, error)
// GetCooldownCountData returns per-(domain, service_id, rpc_type) counts of
// endpoints currently in strike cooldown. Optional: implementations may return
// nil if cooldown tracking is not supported.
GetCooldownCountData(ctx context.Context) ([]CooldownCountEntry, error)
}

// LeaderboardPublisher publishes endpoint leaderboard metrics every 10 seconds
Expand Down Expand Up @@ -183,6 +196,24 @@ func (lp *LeaderboardPublisher) publishLeaderboard(ctx context.Context) {
}
lp.logger.Debug().Int("entries", len(supplierScores)).Msg("Published supplier scores")
}

// Publish per-domain endpoint cooldown counts. Reset between snapshots so a
// domain that drops to zero cooldown'd endpoints actually shows zero (instead
// of sticking at its last value via Prometheus' 5-min staleness window).
cooldownCounts, err := lp.provider.GetCooldownCountData(ctx)
if err != nil {
lp.logger.Warn().Err(err).Msg("Failed to get cooldown count data")
return
}

EndpointsInCooldown.Reset()

if len(cooldownCounts) > 0 {
for _, entry := range cooldownCounts {
EndpointsInCooldown.WithLabelValues(entry.Domain, entry.RPCType, entry.ServiceID).Set(float64(entry.Count))
}
lp.logger.Debug().Int("entries", len(cooldownCounts)).Msg("Published cooldown counts")
}
}

// PublishOnce can be called to manually trigger a leaderboard publish (for testing)
Expand Down
54 changes: 54 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,60 @@ var ProbationEventsTotal = promauto.NewCounterVec(
[]string{LabelDomain, LabelRPCType, LabelServiceID, LabelProbationEvent},
)

// =============================================================================
// Domain Circuit Breaker State (Gauge)
// Labels: service_id, domain
// Value: 1 if domain is currently broken (circuit-breaker locked out), 0 otherwise
// Purpose: Surface circuit-breaker state to dashboards. Without this, broken-domain
// state lives only in Redis (`path:gw:circuit:{serviceID}`) and is invisible to
// operators inspecting Grafana.
// =============================================================================

var DomainCircuitBreakerState = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: MetricPrefix + "circuit_breaker_state",
Help: "1 if the domain is currently circuit-broken for this service (locked out from selection), 0 otherwise. Set on MarkBroken / MarkRecovered transitions and refreshed from Redis on each Get.",
},
[]string{LabelServiceID, LabelDomain},
)

// SetCircuitBreakerState sets the per-domain circuit-breaker gauge.
// - broken=true → 1 (domain is currently locked out)
// - broken=false → 0 (domain is healthy / has been recovered)
// Skipped silently when domain is empty (which would happen for endpoints whose
// URL parse fails and only happens in error paths anyway).
func SetCircuitBreakerState(serviceID, domain string, broken bool) {
if domain == "" {
return
}
v := 0.0
if broken {
v = 1.0
}
DomainCircuitBreakerState.WithLabelValues(serviceID, domain).Set(v)
}

// =============================================================================
// Endpoints In Cooldown (Gauge, published every 10s via leaderboard publisher)
// Labels: domain, rpc_type, service_id
// Value: count of endpoints currently in a strike cooldown period (Score.IsInCooldown
// returns true — i.e. CooldownUntil is in the future).
// Purpose: Cooldown is a TEMPORARY state imposed by accumulated critical strikes,
// independent from "score below minThreshold". Pre-this-metric, cooldown state was
// only visible via /ready/<svc>?detailed=true on a single pod. Now operators can
// see at a glance: "how many endpoints does this domain have in cooldown right now?"
// (For "below minThreshold / excluded" use path_reputation_endpoint_leaderboard
// with tier_threshold="0" — that's already covered by the existing leaderboard.)
// =============================================================================

var EndpointsInCooldown = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: MetricPrefix + "endpoints_in_cooldown",
Help: "Number of endpoints currently in strike cooldown (Score.CooldownUntil in the future). Published every 10s. Cooldown is independent from score-below-threshold — an endpoint can be cooldown'd even with a high score after critical strikes.",
},
[]string{LabelDomain, LabelRPCType, LabelServiceID},
)

// =============================================================================
// Supplier Blacklist Events (Counter)
// Labels: domain, supplier, service_id, reason
Expand Down
Loading
Loading