Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: process metrics
  • Loading branch information
bpetit committed Apr 11, 2023
commit 9c4f641342acd5fd4cedf0355a0e97034b0f32ab
54 changes: 27 additions & 27 deletions src/exporters/warpten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{env, thread};
/// (contributions welcome to support websockets).
pub struct Warp10Exporter {
sensor: Box<dyn Sensor>,
metric_generator: MetricGenerator
}

impl Exporter for Warp10Exporter {
Expand All @@ -34,15 +35,15 @@ impl Exporter for Warp10Exporter {
let step: u64 = *parameters.get_one("step").unwrap();
let qemu = parameters.get_flag("qemu");
let watch_containers = parameters.get_flag("containers");
self.metric_generator.watch_containers = watch_containers;
self.metric_generator.qemu = qemu;

loop {
match self.iteration(
host,
scheme,
port.parse::<u16>().unwrap(),
&write_token,
qemu,
watch_containers,
) {
Ok(res) => debug!("Result: {:?}", res),
Err(err) => error!("Failed ! {:?}", err),
Expand Down Expand Up @@ -120,8 +121,16 @@ impl Exporter for Warp10Exporter {

impl Warp10Exporter {
/// Instantiates and returns a new Warp10Exporter
pub fn new(sensor: Box<dyn Sensor>) -> Warp10Exporter {
Warp10Exporter { sensor }
pub fn new(mut sensor: Box<dyn Sensor>) -> Warp10Exporter {
let topology = match *sensor.get_topology() {
Some(topo) => topo,
None => {
panic!("Couldn't generate the Topology");
}
};
let metric_generator =
MetricGenerator::new(topology, get_hostname(), false, false);
Warp10Exporter { sensor, metric_generator }
}

/// Collects data from the Topology, creates warp10::Data objects containing the
Expand All @@ -134,47 +143,38 @@ impl Warp10Exporter {
port: u16,
write_token: &str,
//read_token: Option<&str>,
qemu: bool,
watch_containers: bool,
) -> Result<Vec<warp10::Warp10Response>, warp10::Error> {
let client = warp10::Client::new(&format!("{scheme}://{host}:{port}"))?;
let writer = client.get_writer(write_token.to_string());

let topology = match *self.sensor.get_topology() {
Some(topo) => topo,
None => {
panic!("Couldn't generate the Topology");
}
};

let mut metric_generator =
MetricGenerator::new(topology, get_hostname(), qemu, watch_containers);
metric_generator
self.metric_generator
.topology
.proc_tracker
.clean_terminated_process_records_vectors();

debug!("Refreshing topology.");
metric_generator.topology.refresh();
self.metric_generator.topology.refresh();

metric_generator.gen_all_metrics();
self.metric_generator.gen_all_metrics();

let mut process_data: Vec<warp10::Data> = vec![];
let process_data: Vec<warp10::Data> = vec![];

for metric in metric_generator.pop_metrics() {
for metric in self.metric_generator.pop_metrics() {
let mut labels = vec![];

for (k, v) in metric.attributes {
labels.push(warp10::Label::new(&k, &v));
}

process_data.push(warp10::Data::new(
time::OffsetDateTime::now_utc(),
None,
metric.name,
labels,
warp10::Value::String(metric.metric_value.to_string()),
));
//if !metric.name.starts_with("scaph_domain") && !metric.name.starts_with("scaph_socket") {
// process_data.push(warp10::Data::new(
// time::OffsetDateTime::now_utc(),
// None,
// metric.name,
// labels,
// warp10::Value::String(metric.metric_value.to_string()),
// ));
//}
}

let res = writer.post_sync(process_data)?;
Expand Down