Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove duplicate code into base class
remove unused code
  • Loading branch information
bnayae committed Mar 7, 2018
commit fe041d744b39766475d2f3cf6300d76dfbe10b30
33 changes: 12 additions & 21 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@

namespace InfluxDB.LineProtocol.Client
{
public class LineProtocolClient : ILineProtocolClient
public class LineProtocolClient : LineProtocolClientBase
{
private readonly HttpClient _httpClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the codebase doesn't use explicit internal or private

private readonly string _database, _username, _password;

public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password)
{
}

protected LineProtocolClient(HttpMessageHandler handler, Uri serverBaseAddress, string database, string username, string password)
protected LineProtocolClient(
HttpMessageHandler handler,
Uri serverBaseAddress,
string database,
string username,
string password)
:base(serverBaseAddress, database, username, password)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -29,26 +34,12 @@ protected LineProtocolClient(HttpMessageHandler handler, Uri serverBaseAddress,

// Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib.
_httpClient = new HttpClient(handler) { BaseAddress = serverBaseAddress };
_database = database;
_username = username;
_password = password;
}

public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken))
{
var stringWriter = new StringWriter();

payload.Format(stringWriter);

return SendAsync(stringWriter.ToString(), Precision.Nanoseconds, cancellationToken);
}

public Task<LineProtocolWriteResult> SendAsync(LineProtocolWriter lineProtocolWriter, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(lineProtocolWriter.ToString(), lineProtocolWriter.Precision, cancellationToken);
}

private async Task<LineProtocolWriteResult> SendAsync(string payload, Precision precision, CancellationToken cancellationToken = default(CancellationToken))
protected override async Task<LineProtocolWriteResult> OnSendAsync(
string payload,
Precision precision,
CancellationToken cancellationToken = default(CancellationToken))
{
var endpoint = $"write?db={Uri.EscapeDataString(_database)}";
if (!string.IsNullOrEmpty(_username))
Expand Down
49 changes: 49 additions & 0 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using InfluxDB.LineProtocol.Payload;
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace InfluxDB.LineProtocol.Client
{
public abstract class LineProtocolClientBase : ILineProtocolClient
{
protected readonly string _database, _username, _password;

protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
if (string.IsNullOrEmpty(database))
throw new ArgumentException("A database must be specified");

// Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib.
_database = database;
_username = username;
_password = password;
}

public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken))
{
var stringWriter = new StringWriter();

payload.Format(stringWriter);

return OnSendAsync(stringWriter.ToString(), Precision.Nanoseconds, cancellationToken);
}

public Task<LineProtocolWriteResult> SendAsync(LineProtocolWriter lineProtocolWriter, CancellationToken cancellationToken = default(CancellationToken))
{
return OnSendAsync(lineProtocolWriter.ToString(), lineProtocolWriter.Precision, cancellationToken);
}

protected abstract Task<LineProtocolWriteResult> OnSendAsync(
string payload,
Precision precision,
CancellationToken cancellationToken = default(CancellationToken));
}
}
59 changes: 10 additions & 49 deletions src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@

namespace InfluxDB.LineProtocol.Client
{
public class LineProtocolUdpClient : ILineProtocolClient
public class LineProtocolUdpClient : LineProtocolClientBase
{
private readonly UdpClient _udpClient;
private readonly string _udpHostName;
private readonly int _udpPort;
private readonly string _database, _username, _password;

public LineProtocolUdpClient(
Uri serverBaseAddress,
string database,
string username = null,
string password = null)
Uri serverBaseAddress,
string database,
string username = null,
string password = null)
:base(serverBaseAddress, database, username, password)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -31,52 +31,13 @@ public LineProtocolUdpClient(
_udpHostName = serverBaseAddress.Host;
_udpPort = serverBaseAddress.Port;
_udpClient = new UdpClient();
_database = database;
_username = username;
_password = password;
}

public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken))
protected override async Task<LineProtocolWriteResult> OnSendAsync(
string payload,
Precision precision,
CancellationToken cancellationToken = default(CancellationToken))
{
var stringWriter = new StringWriter();

payload.Format(stringWriter);

return SendAsync(stringWriter.ToString(), Precision.Nanoseconds, cancellationToken);
}

public Task<LineProtocolWriteResult> SendAsync(LineProtocolWriter lineProtocolWriter, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(lineProtocolWriter.ToString(), lineProtocolWriter.Precision, cancellationToken);
}

private async Task<LineProtocolWriteResult> SendAsync(string payload, Precision precision, CancellationToken cancellationToken = default(CancellationToken))
{
var endpoint = $"write?db={Uri.EscapeDataString(_database)}";
if (!string.IsNullOrEmpty(_username))
endpoint += $"&u={Uri.EscapeDataString(_username)}&p={Uri.EscapeDataString(_password)}";

switch (precision)
{
case Precision.Microseconds:
endpoint += "&precision=u";
break;
case Precision.Milliseconds:
endpoint += "&precision=ms";
break;
case Precision.Seconds:
endpoint += "&precision=s";
break;
case Precision.Minutes:
endpoint += "&precision=m";
break;
case Precision.Hours:
endpoint += "&precision=h";
break;
}

var content = new StringContent(payload, Encoding.UTF8);

var buffer = Encoding.UTF8.GetBytes(payload);
int len = await _udpClient.SendAsync(buffer, buffer.Length, _udpHostName, _udpPort);
return new LineProtocolWriteResult(len == buffer.Length, null);
Expand Down
27 changes: 26 additions & 1 deletion test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,31 @@
using System.Threading;
using System.Threading.Tasks;

// influx-db command line:
// start with
// # influx
// # show databases
// # CREATE DATABASE {name}
// # DROP DATABASE {name}
// # precision rfc3339
// # use <database>
// # SHOW MEASUREMENTS
// # SHOW MEASUREMENTS WITH MEASUREMENT =~ /v1\..*/ -- all fields from measurements that start with 'v1.'
// # SHOW SERIES
// # SHOW SERIES [FROM <measurement_name> [WHERE <tag_key>='<tag_value>']]
// # DROP SERIES FROM /v1.*\.end/
// # SHOW TAG KEYS
// # SHOW TAG KEYS FROM "v1.cos"
// # SHOW FIELD KEYS
// # SHOW FIELD KEYS FROM /v1\..*\.sin/ -- all fields from series that start with 'v1.' and end with '.sin'

/*
# influx
docker run --name influx -p 8086:8086 -p 8089:8089/udp -p 8088:8088 -v C:\Docker\Volumes\influxdb\db:/var/lib/influxdb -v C:\Docker\Volumes\influxdb\config\influxdb.conf:/etc/influxdb/influxdb.conf:ro influxdb -config /etc/influxdb/influxdb.conf
docker run -d -p 8083:8083 -p 8086:8086 -p 8089:4444/udp --expose 8083 --expose 8086 --expose 4444 -e UDP_DB="playground" tutum/influxdb

*/

namespace InfluxDb.UdpSupport.ConsoleTest
{
class Program
Expand All @@ -22,7 +47,7 @@ static void Main(string[] args)
Metrics.Collector = new CollectorConfiguration()
.Tag.With("process", Path.GetFileName(process.Id.ToString()))
.Batch.AtInterval(TimeSpan.FromSeconds(2))
.WriteTo.InfluxDB("udp://localhost:8999", "data")
.WriteTo.InfluxDB("udp://localhost:8089", "data")
.CreateCollector();

int i = 0;
Expand Down