Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: avoid golangci-lint errors
  • Loading branch information
aldy505 committed Oct 23, 2025
commit 6ff23fbb2cb9afd06cb2d84af50f14c0b9bca78f
48 changes: 24 additions & 24 deletions batch_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ func NewBatchMeter(client *Client) *BatchMeter {
}
}

func (l *BatchMeter) Start() {
l.startOnce.Do(func() {
func (m *BatchMeter) Start() {
m.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
l.cancel = cancel
l.wg.Add(1)
go l.run(ctx)
m.cancel = cancel
m.wg.Add(1)
go m.run(ctx)
})
}

func (l *BatchMeter) Flush(timeout <-chan struct{}) {
func (m *BatchMeter) Flush(timeout <-chan struct{}) {
done := make(chan struct{})
select {
case l.flushCh <- done:
case m.flushCh <- done:
select {
case <-done:
case <-timeout:
Expand All @@ -45,27 +45,27 @@ func (l *BatchMeter) Flush(timeout <-chan struct{}) {
}
}

func (l *BatchMeter) Shutdown() {
l.shutdownOnce.Do(func() {
if l.cancel != nil {
l.cancel()
l.wg.Wait()
func (m *BatchMeter) Shutdown() {
m.shutdownOnce.Do(func() {
if m.cancel != nil {
m.cancel()
m.wg.Wait()
}
})
}

func (l *BatchMeter) run(ctx context.Context) {
defer l.wg.Done()
func (m *BatchMeter) run(ctx context.Context) {
defer m.wg.Done()
var metrics []Metric
timer := time.NewTimer(batchTimeout)
defer timer.Stop()

for {
select {
case metric := <-l.metricsCh:
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
if len(metrics) >= batchSize {
l.processEvent(metrics)
m.processEvent(metrics)
metrics = nil
if !timer.Stop() {
<-timer.C
Expand All @@ -74,23 +74,23 @@ func (l *BatchMeter) run(ctx context.Context) {
}
case <-timer.C:
if len(metrics) > 0 {
l.processEvent(metrics)
m.processEvent(metrics)
metrics = nil
}
timer.Reset(batchTimeout)
case done := <-l.flushCh:
case done := <-m.flushCh:
flushDrain:
for {
select {
case metric := <-l.metricsCh:
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
default:
break flushDrain
}
}

if len(metrics) > 0 {
l.processEvent(metrics)
m.processEvent(metrics)
metrics = nil
}
if !timer.Stop() {
Expand All @@ -102,26 +102,26 @@ func (l *BatchMeter) run(ctx context.Context) {
drain:
for {
select {
case metric := <-l.metricsCh:
case metric := <-m.metricsCh:
metrics = append(metrics, metric)
default:
break drain
}
}

if len(metrics) > 0 {
l.processEvent(metrics)
m.processEvent(metrics)
}
return
}
}
}

func (l *BatchMeter) processEvent(metrics []Metric) {
func (m *BatchMeter) processEvent(metrics []Metric) {
event := NewEvent()
event.Timestamp = time.Now()
event.EventID = EventID(uuid())
event.Type = traceMetricEvent.Type
event.Metrics = metrics
l.client.Transport.SendEvent(event)
m.client.Transport.SendEvent(event)
}
Loading