Skip to content

Commit 0bd4136

Browse files
authored
Merge pull request php-enqueue#809 from php-enqueue/influxdb-config-options
[InfluxDB][Monitoring] Allow passing Client as configuration option.
2 parents 8001e91 + 68c47fd commit 0bd4136

File tree

2 files changed

+109
-41
lines changed

2 files changed

+109
-41
lines changed

docs/monitoring.md

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,19 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
99

1010
# Monitoring.
1111

12-
Enqueue provides a tool for monitoring message queues.
12+
Enqueue provides a tool for monitoring message queues.
1313
With it, you can control how many messages were sent, how many processed successfuly or failed.
14-
How many consumers are working, their up time, processed messages stats, memory usage and system load.
14+
How many consumers are working, their up time, processed messages stats, memory usage and system load.
1515
The tool could be integrated with virtually any analytics and monitoring platform.
16-
There are several integration:
16+
There are several integration:
1717
* [Datadog StatsD](https://datadoghq.com)
1818
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
19-
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
19+
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
2020
We are working on a JS\WAMP based real-time UI tool, for more information please [contact us]([email protected]).
2121

2222
![Grafana Monitoring](images/grafana_monitoring.jpg)
2323

24-
[contact us]([email protected]) if need a Grafana template such as on the picture.
24+
[contact us]([email protected]) if need a Grafana template such as on the picture.
2525

2626
* [Installation](#installation)
2727
* [Track sent messages](#track-sent-messages)
@@ -40,7 +40,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please
4040
composer req enqueue/monitoring:0.9.x-dev
4141
```
4242

43-
## Track sent messages
43+
## Track sent messages
4444

4545
```php
4646
<?php
@@ -50,7 +50,7 @@ use Enqueue\Monitoring\GenericStatsStorageFactory;
5050
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
5151
$statsStorage->pushSentMessageStats(new SentMessageStats(
5252
(int) (microtime(true) * 1000), // timestamp
53-
'queue_name', // queue
53+
'queue_name', // queue
5454
'aMessageId',
5555
'aCorrelationId',
5656
[], // headers
@@ -76,7 +76,7 @@ $context->createProducer()->send($queue, $message);
7676
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
7777
$statsStorage->pushSentMessageStats(new SentMessageStats(
7878
(int) (microtime(true) * 1000),
79-
$queue->getQueueName(),
79+
$queue->getQueueName(),
8080
$message->getMessageId(),
8181
$message->getCorrelationId(),
8282
$message->getHeaders()[],
@@ -99,7 +99,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1
9999
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
100100
'consumerId',
101101
(int) (microtime(true) * 1000), // now
102-
$receivedAt,
102+
$receivedAt,
103103
'aQueue',
104104
'aMessageId',
105105
'aCorrelationId',
@@ -127,16 +127,16 @@ $consumer = $context->createConsumer($queue);
127127
$consumerId = uniqid('consumer-id', true); // we suggest using UUID here
128128
if ($message = $consumer->receiveNoWait()) {
129129
$receivedAt = (int) (microtime(true) * 1000);
130-
130+
131131
// heavy processing here.
132-
132+
133133
$consumer->acknowledge($message);
134-
134+
135135
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
136136
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
137137
$consumerId,
138138
(int) (microtime(true) * 1000), // now
139-
$receivedAt,
139+
$receivedAt,
140140
$queue->getQueueName(),
141141
$message->getMessageId(),
142142
$message->getCorrelationId(),
@@ -151,7 +151,7 @@ if ($message = $consumer->receiveNoWait()) {
151151
## Track consumer metrics
152152

153153
Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so.
154-
This example shows how you can send such metrics.
154+
This example shows how you can send such metrics.
155155
Call this code from time to time between processing messages.
156156

157157
```php
@@ -165,13 +165,13 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1
165165
$statsStorage->pushConsumerStats(new ConsumerStats(
166166
'consumerId',
167167
(int) (microtime(true) * 1000), // now
168-
$startedAt,
168+
$startedAt,
169169
null, // finished at
170-
true, // is started?
170+
true, // is started?
171171
false, // is finished?
172172
false, // is failed
173173
['foo'], // consume from queues
174-
123, // received messages
174+
123, // received messages
175175
120, // acknowledged messages
176176
1, // rejected messages
177177
1, // requeued messages
@@ -182,7 +182,7 @@ $statsStorage->pushConsumerStats(new ConsumerStats(
182182

183183
## Consumption extension
184184

185-
There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
185+
There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
186186
It could collect consumed messages and consumer stats for you.
187187

188188
```php
@@ -236,8 +236,16 @@ There are available options:
236236
* 'measurementSentMessages' => 'sent-messages',
237237
* 'measurementConsumedMessages' => 'consumed-messages',
238238
* 'measurementConsumers' => 'consumers',
239+
* 'client' => null,
240+
* 'retentionPolicy' => null,
239241
```
240242

243+
You can pass InfluxDB\Client instance in `client` option. Otherwise, it will be created on first use according to other
244+
options.
245+
246+
If your InfluxDB\Client uses driver that implements InfluxDB\Driver\QueryDriverInterface, then database will be
247+
automatically created for you if it doesn't exist. Default InfluxDB\Client will also do that.
248+
241249
## Datadog storage
242250

243251
Install additional packages:
@@ -256,7 +264,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1:
256264
For best experience please adjust units and types in metric summary.
257265

258266
Example dashboard:
259-
267+
260268
![Datadog monitoring](images/datadog_monitoring.png)
261269

262270

@@ -311,7 +319,7 @@ There are available options:
311319

312320
## Symfony App
313321

314-
You have to register some services in order to incorporate monitoring facilities into your Symfony application.
322+
You have to register some services in order to incorporate monitoring facilities into your Symfony application.
315323

316324
```yaml
317325
# config/packages/enqueue.yaml
@@ -325,11 +333,11 @@ enqueue:
325333
transport: 'amqp://guest:guest@foo:5672/%2f'
326334
monitoring: 'wamp://127.0.0.1:9090?topic=stats'
327335
client: ~
328-
336+
329337
datadog:
330338
transport: 'amqp://guest:guest@foo:5672/%2f'
331339
monitoring: 'datadog://127.0.0.1:8125?batched=false'
332340
client: ~
333341
```
334342
335-
[back to index](index.md)
343+
[back to index](index.md)

pkg/monitoring/InfluxDbStorage.php

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use Enqueue\Dsn\Dsn;
77
use InfluxDB\Client;
88
use InfluxDB\Database;
9+
use InfluxDB\Driver\QueryDriverInterface;
10+
use InfluxDB\Exception as InfluxDBException;
911
use InfluxDB\Point;
1012

1113
class InfluxDbStorage implements StatsStorage
@@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage
3840
* 'measurementSentMessages' => 'sent-messages',
3941
* 'measurementConsumedMessages' => 'consumed-messages',
4042
* 'measurementConsumers' => 'consumers',
43+
* 'client' => null, # Client instance. Null by default.
44+
* 'retentionPolicy' => null,
4145
* ]
4246
*
4347
* or
@@ -55,10 +59,17 @@ public function __construct($config = 'influxdb:')
5559
if (empty($config)) {
5660
$config = [];
5761
} elseif (is_string($config)) {
58-
$config = $this->parseDsn($config);
62+
$config = self::parseDsn($config);
5963
} elseif (is_array($config)) {
60-
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
64+
$config = empty($config['dsn']) ? $config : self::parseDsn($config['dsn']);
6165
} elseif ($config instanceof Client) {
66+
// Passing Client instead of array config is deprecated because it prevents setting any configuration values
67+
// and causes library to use defaults.
68+
@trigger_error(
69+
sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property or use createWithClient instead',
70+
Client::class,
71+
__METHOD__
72+
), E_USER_DEPRECATED);
6273
$this->client = $config;
6374
$config = [];
6475
} else {
@@ -74,11 +85,41 @@ public function __construct($config = 'influxdb:')
7485
'measurementSentMessages' => 'sent-messages',
7586
'measurementConsumedMessages' => 'consumed-messages',
7687
'measurementConsumers' => 'consumers',
88+
'client' => null,
89+
'retentionPolicy' => null,
7790
], $config);
7891

92+
if (null !== $config['client']) {
93+
if (!$config['client'] instanceof Client) {
94+
throw new \InvalidArgumentException(sprintf(
95+
'%s configuration property is expected to be an instance of %s class. %s was passed instead.',
96+
'client',
97+
Client::class,
98+
gettype($config['client'])
99+
));
100+
}
101+
$this->client = $config['client'];
102+
}
103+
79104
$this->config = $config;
80105
}
81106

107+
/**
108+
* @param Client $client
109+
* @param string $config
110+
*
111+
* @return InfluxDbStorage
112+
*/
113+
public static function createWithClient(Client $client, $config = 'influxdb:'): self
114+
{
115+
if (is_string($config)) {
116+
$config = self::parseDsn($config);
117+
}
118+
$config['client'] = $client;
119+
120+
return new static($config);
121+
}
122+
82123
public function pushConsumerStats(ConsumerStats $stats): void
83124
{
84125
$points = [];
@@ -109,7 +150,7 @@ public function pushConsumerStats(ConsumerStats $stats): void
109150
$points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs());
110151
}
111152

112-
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
153+
$this->doWrite($points);
113154
}
114155

115156
public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
@@ -135,7 +176,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
135176
new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()),
136177
];
137178

138-
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
179+
$this->doWrite($points);
139180
}
140181

141182
public function pushSentMessageStats(SentMessageStats $stats): void
@@ -158,29 +199,47 @@ public function pushSentMessageStats(SentMessageStats $stats): void
158199
new Point($this->config['measurementSentMessages'], 1, $tags, [], $stats->getTimestampMs()),
159200
];
160201

161-
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
202+
$this->doWrite($points);
162203
}
163204

164-
private function getDb(): Database
205+
private function doWrite(array $points): void
165206
{
166-
if (null === $this->database) {
167-
if (null === $this->client) {
168-
$this->client = new Client(
169-
$this->config['host'],
170-
$this->config['port'],
171-
$this->config['user'],
172-
$this->config['password']
173-
);
207+
if (null === $this->client) {
208+
$this->client = new Client(
209+
$this->config['host'],
210+
$this->config['port'],
211+
$this->config['user'],
212+
$this->config['password']
213+
);
214+
}
215+
216+
if ($this->client->getDriver() instanceof QueryDriverInterface) {
217+
if (null === $this->database) {
218+
$this->database = $this->client->selectDB($this->config['db']);
219+
$this->database->create();
174220
}
175221

176-
$this->database = $this->client->selectDB($this->config['db']);
177-
$this->database->create();
222+
$this->database->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']);
223+
} else {
224+
// Code below mirrors what `writePoints` method of Database does.
225+
try {
226+
$parameters = [
227+
'url' => sprintf('write?db=%s&precision=%s', $this->config['db'], Database::PRECISION_MILLISECONDS),
228+
'database' => $this->config['db'],
229+
'method' => 'post',
230+
];
231+
if (null !== $this->config['retentionPolicy']) {
232+
$parameters['url'] .= sprintf('&rp=%s', $this->config['retentionPolicy']);
233+
}
234+
235+
$this->client->write($parameters, $points);
236+
} catch (\Exception $e) {
237+
throw new InfluxDBException($e->getMessage(), $e->getCode());
238+
}
178239
}
179-
180-
return $this->database;
181240
}
182241

183-
private function parseDsn(string $dsn): array
242+
private static function parseDsn(string $dsn): array
184243
{
185244
$dsn = Dsn::parseFirst($dsn);
186245

@@ -200,6 +259,7 @@ private function parseDsn(string $dsn): array
200259
'measurementSentMessages' => $dsn->getString('measurementSentMessages'),
201260
'measurementConsumedMessages' => $dsn->getString('measurementConsumedMessages'),
202261
'measurementConsumers' => $dsn->getString('measurementConsumers'),
262+
'retentionPolicy' => $dsn->getString('retentionPolicy'),
203263
]), function ($value) { return null !== $value; });
204264
}
205265
}

0 commit comments

Comments
 (0)