Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions plugins/inputs/procstat/procstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Procstat struct {
Log telegraf.Logger `toml:"-"`

finder pidFinder
processes map[pid]process
processes map[string]map[pid]process
cfg collectionConfig
oldMode bool

Expand Down Expand Up @@ -204,7 +204,7 @@ func (p *Procstat) Init() error {
}

// Initialize the running process cache
p.processes = make(map[pid]process)
p.processes = make(map[string]map[pid]process)

return nil
}
Expand Down Expand Up @@ -240,6 +240,13 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
return err
}

// Use empty string as filter key for old mode (single implicit filter)
const oldModeKey = ""
if p.processes[oldModeKey] == nil {
p.processes[oldModeKey] = make(map[pid]process)
}
procs := p.processes[oldModeKey]

var count int
running := make(map[pid]bool)
for _, r := range results {
Expand All @@ -259,7 +266,7 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {

// Use the cached processes as we need the existing instances
// to compute delta-metrics (e.g. cpu-usage).
if cached, found := p.processes[pid]; found {
if cached, found := procs[pid]; found {
proc = cached
} else {
// We've found a process that was not recorded before so add it
Expand All @@ -278,7 +285,7 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
if p.ProcessName != "" {
proc.setTag("process_name", p.ProcessName)
}
p.processes[pid] = proc
procs[pid] = proc
}
running[pid] = true
metrics, err := proc.metrics(p.Prefix, &p.cfg, now)
Expand All @@ -294,9 +301,9 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
}

// Cleanup processes that are not running anymore
for pid := range p.processes {
for pid := range procs {
if !running[pid] {
delete(p.processes, pid)
delete(procs, pid)
}
}

Expand Down Expand Up @@ -325,7 +332,6 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {

func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
now := time.Now()
running := make(map[pid]bool)
for _, f := range p.Filter {
groups, err := f.applyFilter()
if err != nil {
Expand All @@ -347,20 +353,29 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
continue
}

// Initialize the process cache for this filter if needed
if p.processes[f.Name] == nil {
p.processes[f.Name] = make(map[pid]process)
}
filterProcs := p.processes[f.Name]

// Track running processes for this filter to clean up stale entries
running := make(map[pid]bool)

var count int
for _, g := range groups {
count += len(g.processes)
level := strconv.Itoa(g.level)
for _, gp := range g.processes {
// Skip over non-running processes
if running, err := gp.IsRunning(); err != nil || !running {
if isRunning, err := gp.IsRunning(); err != nil || !isRunning {
continue
}

// Use the cached processes as we need the existing instances
// to compute delta-metrics (e.g. cpu-usage).
pid := pid(gp.Pid)
process, found := p.processes[pid]
process, found := filterProcs[pid]
if !found {
//nolint:errcheck // Assumption: if a process has no name, it probably does not exist
if name, _ := gp.Name(); name == "" {
Expand All @@ -369,12 +384,12 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {

// We've found a process that was not recorded before so add it
// to the list of processes
tags := make(map[string]string, len(g.tags)+1)
tags := make(map[string]string, len(g.tags)+2)
for k, v := range g.tags {
tags[k] = v
}
if p.ProcessName != "" {
process.setTag("process_name", p.ProcessName)
tags["process_name"] = p.ProcessName
}
tags["filter"] = f.Name
if p.cfg.tagging["level"] {
Expand All @@ -386,7 +401,7 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
hasCPUTimes: false,
tags: tags,
}
p.processes[pid] = process
filterProcs[pid] = process
}
running[pid] = true
metrics, err := process.metrics(p.Prefix, &p.cfg, now)
Expand Down Expand Up @@ -418,6 +433,13 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
}
}

// Cleanup processes that are not running anymore for this filter
for pid := range filterProcs {
if !running[pid] {
delete(filterProcs, pid)
}
}

// Add lookup statistics-metric
acc.AddFields(
"procstat_lookup",
Expand All @@ -434,12 +456,6 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
)
}

// Cleanup processes that are not running anymore across all filters/groups
for pid := range p.processes {
if !running[pid] {
delete(p.processes, pid)
}
}
return nil
}

Expand Down
110 changes: 110 additions & 0 deletions plugins/inputs/procstat/procstat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,113 @@ func TestGather_MoresupervisorUnitPIDs(t *testing.T) {
}
}
}

func TestGather_MultipleFiltersMatchingSameProcess(t *testing.T) {
// This test verifies that when multiple filters match the same process (PID),
// each filter produces metrics with its own unique filter tag.
// This is a regression test for https://github.com/influxdata/telegraf/issues/18041
processID := pid(os.Getpid())
processName, err := gopsprocess.NewProcess(int32(processID))
require.NoError(t, err)
name, err := processName.Name()
require.NoError(t, err)

p := Procstat{
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
Filter: []filter{
{
Name: "filter_one",
ProcessNames: []string{"*" + name}, // Match current process
},
{
Name: "filter_two",
ProcessNames: []string{"*" + name}, // Same pattern matches same process
},
},
}
require.NoError(t, p.Init())

var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))

// Collect all procstat metrics and their filter tags
filterTagsFound := make(map[string]int)
for _, m := range acc.GetTelegrafMetrics() {
if m.Name() == "procstat" {
filterTag, ok := m.GetTag("filter")
require.True(t, ok, "procstat metric should have a filter tag")
filterTagsFound[filterTag]++
}
}

// Verify that we got metrics from both filters with their respective tags
require.Contains(t, filterTagsFound, "filter_one", "should have metrics with filter=filter_one")
require.Contains(t, filterTagsFound, "filter_two", "should have metrics with filter=filter_two")

// Both filters should produce at least one metric for the matching process
require.GreaterOrEqual(t, filterTagsFound["filter_one"], 1, "filter_one should produce at least 1 metric")
require.GreaterOrEqual(t, filterTagsFound["filter_two"], 1, "filter_two should produce at least 1 metric")
}

func TestGather_MultipleFiltersProcessCacheIsolation(t *testing.T) {
// This test verifies that the process cache is correctly isolated per filter.
// Each filter should maintain its own process cache for CPU usage calculations.
// This is a regression test for https://github.com/influxdata/telegraf/issues/18041
processID := pid(os.Getpid())
processName, err := gopsprocess.NewProcess(int32(processID))
require.NoError(t, err)
name, err := processName.Name()
require.NoError(t, err)

p := Procstat{
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
Filter: []filter{
{
Name: "first",
ProcessNames: []string{"*" + name},
},
{
Name: "second",
ProcessNames: []string{"*" + name},
},
},
}
require.NoError(t, p.Init())

// First gather - should create process entries for both filters
var acc1 testutil.Accumulator
require.NoError(t, p.Gather(&acc1))

// Verify process cache has entries for both filters
require.Contains(t, p.processes, "first", "process cache should have 'first' filter")
require.Contains(t, p.processes, "second", "process cache should have 'second' filter")

// Both filters should have the same PID in their cache
require.Contains(t, p.processes["first"], processID, "first filter should cache current process")
require.Contains(t, p.processes["second"], processID, "second filter should cache current process")

// The cached process objects should be different instances
proc1 := p.processes["first"][processID]
proc2 := p.processes["second"][processID]
require.NotSame(t, proc1, proc2, "each filter should have its own process instance")

// Second gather - should reuse cached processes for delta calculations
var acc2 testutil.Accumulator
require.NoError(t, p.Gather(&acc2))

// Count metrics per filter
filterCounts := make(map[string]int)
for _, m := range acc2.GetTelegrafMetrics() {
if m.Name() == "procstat" {
if filterTag, ok := m.GetTag("filter"); ok {
filterCounts[filterTag]++
}
}
}

// Both filters should still produce metrics
require.GreaterOrEqual(t, filterCounts["first"], 1, "first filter should produce metrics on second gather")
require.GreaterOrEqual(t, filterCounts["second"], 1, "second filter should produce metrics on second gather")
}
Loading