Skip to content

Commit f9b06ed

Browse files
author
Emile Joubert
committed
More consistent rpc client, more wrapping
1 parent b47b345 commit f9b06ed

File tree

6 files changed

+58
-36
lines changed

6 files changed

+58
-36
lines changed

dotnet/EmitLogDirect.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ public static void Main(string[] args) {
1111
channel.ExchangeDeclare("direct_logs", "direct");
1212

1313
string severity = (args.Length > 0) ? args[0] : "info";
14-
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray())
14+
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1)
15+
.ToArray())
1516
: "Hello World!";
1617
byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
1718
channel.BasicPublish("direct_logs", severity, null, body);

dotnet/EmitLogTopic.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ public static void Main(string[] args) {
1111
channel.ExchangeDeclare("topic_logs", "topic");
1212

1313
string routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
14-
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray())
14+
string message = (args.Length > 1) ? string.Join(" ", args.Skip(1)
15+
.ToArray())
1516
: "Hello World!";
1617
byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
1718
channel.BasicPublish("topic_logs", routingKey, null, body);

dotnet/RPCClient.cs

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,53 @@
22
using RabbitMQ.Client;
33
using RabbitMQ.Client.Events;
44

5-
class RPCClient {
6-
private static string RpcCall(string message) {
7-
string response = null;
5+
class RPCClient : IDisposable {
6+
private IConnection connection;
7+
private IModel channel;
8+
private string replyQueueName;
9+
private QueueingBasicConsumer consumer;
10+
11+
public RPCClient() {
812
ConnectionFactory factory = new ConnectionFactory();
913
factory.HostName = "localhost";
10-
using (IConnection connection = factory.CreateConnection())
11-
using (IModel channel = connection.CreateModel()) {
12-
string replyQueueName = channel.QueueDeclare();
13-
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
14-
channel.BasicConsume(replyQueueName, false, consumer);
14+
connection = factory.CreateConnection();
15+
channel = connection.CreateModel();
16+
replyQueueName = channel.QueueDeclare();
17+
consumer = new QueueingBasicConsumer(channel);
18+
channel.BasicConsume(replyQueueName, false, consumer);
19+
}
1520

16-
string corrId = Guid.NewGuid().ToString();
17-
IBasicProperties props = channel.CreateBasicProperties();
18-
props.ReplyTo = replyQueueName;
19-
props.CorrelationId = corrId;
21+
public string Call(string message) {
22+
string response = null;
23+
string corrId = Guid.NewGuid().ToString();
24+
IBasicProperties props = channel.CreateBasicProperties();
25+
props.ReplyTo = replyQueueName;
26+
props.CorrelationId = corrId;
2027

21-
byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
22-
channel.BasicPublish("", "rpc_queue", props, messageBytes);
28+
byte[] messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
29+
channel.BasicPublish("", "rpc_queue", props, messageBytes);
2330

24-
while (true) {
25-
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
26-
if (ea.BasicProperties.CorrelationId == corrId) {
27-
byte[] body = ea.Body;
28-
response = System.Text.Encoding.UTF8.GetString(body);
29-
channel.BasicCancel(consumer.ConsumerTag);
30-
break;
31-
}
31+
while (true) {
32+
BasicDeliverEventArgs ea =
33+
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
34+
if (ea.BasicProperties.CorrelationId == corrId) {
35+
byte[] body = ea.Body;
36+
response = System.Text.Encoding.UTF8.GetString(body);
37+
channel.BasicCancel(consumer.ConsumerTag);
38+
break;
3239
}
33-
return response;
3440
}
41+
return response;
42+
}
43+
public void Dispose() {
44+
connection.Close();
3545
}
3646

3747
public static void Main() {
3848
Console.WriteLine(" [x] Requesting fib(30)");
39-
string response = RpcCall("30");
40-
Console.WriteLine(" [.] Got '{0}'", response);
49+
using (RPCClient rpcClient = new RPCClient()) {
50+
string response = rpcClient.Call("30");
51+
Console.WriteLine(" [.] Got '{0}'", response);
52+
}
4153
}
42-
}
54+
}

dotnet/RPCServer.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public static void Main() {
1616

1717
while(true) {
1818
string response = null;
19-
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
19+
BasicDeliverEventArgs ea =
20+
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
2021

2122
byte[] body = ea.Body;
2223
IBasicProperties props = ea.BasicProperties;
@@ -32,8 +33,10 @@ public static void Main() {
3233
Console.WriteLine(" [.] " + e);
3334
response = "";
3435
} finally {
35-
byte[] responseBytes = System.Text.Encoding.UTF8.GetBytes(response);
36-
channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
36+
byte[] responseBytes =
37+
System.Text.Encoding.UTF8.GetBytes(response);
38+
channel.BasicPublish("", props.ReplyTo, replyProps,
39+
responseBytes);
3740
channel.BasicAck(ea.DeliveryTag, false);
3841
}
3942
}

dotnet/ReceiveLogsDirect.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public static void Main(string[] args) {
2323
channel.QueueBind(queue_name, "direct_logs", severity);
2424
}
2525

26-
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
26+
Console.WriteLine(" [*] Waiting for messages. " +
27+
"To exit press CTRL+C");
2728

2829
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
2930
channel.BasicConsume(queue_name, true, consumer);
@@ -35,7 +36,8 @@ public static void Main(string[] args) {
3536
byte[] body = ea.Body;
3637
string message = System.Text.Encoding.UTF8.GetString(body);
3738
string routingKey = ea.RoutingKey;
38-
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
39+
Console.WriteLine(" [x] Received '{0}':'{1}'",
40+
routingKey, message);
3941
}
4042
}
4143
}

dotnet/ReceiveLogsTopic.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ public static void Main(string[] args) {
2222
channel.QueueBind(queue_name, "topic_logs", bindingKey);
2323
}
2424

25-
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
25+
Console.WriteLine(" [*] Waiting for messages. " +
26+
"To exit press CTRL+C");
2627

27-
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
28+
QueueingBasicConsumer consumer =
29+
new QueueingBasicConsumer(channel);
2830
channel.BasicConsume(queue_name, true, consumer);
2931

3032
while(true) {
@@ -34,7 +36,8 @@ public static void Main(string[] args) {
3436
byte[] body = ea.Body;
3537
string message = System.Text.Encoding.UTF8.GetString(body);
3638
string routingKey = ea.RoutingKey;
37-
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
39+
Console.WriteLine(" [x] Received '{0}':'{1}'",
40+
routingKey, message);
3841
}
3942
}
4043
}

0 commit comments

Comments
 (0)