Skip to content

Commit ce7ecb9

Browse files
authored
Allows override of content-type in pubsub publish. (dapr#492)
1 parent 99c3794 commit ce7ecb9

File tree

11 files changed

+264
-31
lines changed

11 files changed

+264
-31
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation and Dapr Contributors.
3+
* Licensed under the MIT License.
4+
*/
5+
6+
package io.dapr.examples.pubsub.http;
7+
8+
import io.dapr.client.DaprClient;
9+
import io.dapr.client.DaprClientBuilder;
10+
import io.dapr.client.domain.CloudEvent;
11+
import io.dapr.client.domain.Metadata;
12+
import io.dapr.client.domain.PublishEventRequestBuilder;
13+
14+
import java.util.UUID;
15+
16+
import static java.util.Collections.singletonMap;
17+
18+
/**
19+
* Message publisher.
20+
* 1. Build and install jars:
21+
* mvn clean install
22+
* 2. cd [repo root]/examples
23+
* 3. Run the program:
24+
* dapr run --components-path ./components/pubsub --app-id publisher -- \
25+
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.CloudEventPublisher
26+
*/
27+
public class CloudEventPublisher {
28+
29+
//Number of messages to be sent.
30+
private static final int NUM_MESSAGES = 10;
31+
32+
//Time-to-live for messages published.
33+
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
34+
35+
//The title of the topic to be used for publishing
36+
private static final String TOPIC_NAME = "testingtopic";
37+
38+
//The name of the pubsub
39+
private static final String PUBSUB_NAME = "messagebus";
40+
41+
/**
42+
* This is the entry point of the publisher app example.
43+
* @param args Args, unused.
44+
* @throws Exception A startup Exception.
45+
*/
46+
public static void main(String[] args) throws Exception {
47+
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
48+
try (DaprClient client = new DaprClientBuilder().build()) {
49+
for (int i = 0; i < NUM_MESSAGES; i++) {
50+
CloudEvent cloudEvent = new CloudEvent();
51+
cloudEvent.setId(UUID.randomUUID().toString());
52+
cloudEvent.setType("example");
53+
cloudEvent.setSpecversion("1");
54+
cloudEvent.setDatacontenttype("text/plain");
55+
cloudEvent.setData(String.format("This is message #%d", i));
56+
57+
//Publishing messages
58+
client.publishEvent(
59+
new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
60+
.withContentType(CloudEvent.CONTENT_TYPE)
61+
.withMetadata(singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS))
62+
.build()).block();
63+
System.out.println("Published cloud event with message: " + cloudEvent.getData());
64+
65+
try {
66+
Thread.sleep((long) (1000 * Math.random()));
67+
} catch (InterruptedException e) {
68+
e.printStackTrace();
69+
Thread.currentThread().interrupt();
70+
return;
71+
}
72+
}
73+
74+
// This is an example, so for simplicity we are just exiting here.
75+
// Normally a dapr app would be a web service and not exit main.
76+
System.out.println("Done.");
77+
}
78+
}
79+
}

examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class Publisher {
4040
* @throws Exception A startup Exception.
4141
*/
4242
public static void main(String[] args) throws Exception {
43-
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
4443
try (DaprClient client = new DaprClientBuilder().build()) {
4544
for (int i = 0; i < NUM_MESSAGES; i++) {
4645
String message = String.format("This is message #%d", i);

examples/src/main/java/io/dapr/examples/pubsub/http/README.md

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,15 @@ dapr run --components-path ./components/pubsub --app-id subscriber --app-port 30
9696

9797
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
9898

99-
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
99+
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
100+
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
100101
```java
101102
public class Publisher {
102-
private static final int NUM_MESSAGES = 10;
103-
private static final String TOPIC_NAME = "testingtopic";
104-
private static final String PUBSUB_NAME = "messagebus";
103+
private static final int NUM_MESSAGES = 10;
104+
private static final String TOPIC_NAME = "testingtopic";
105+
private static final String PUBSUB_NAME = "messagebus";
105106

106-
///...
107+
///...
107108
public static void main(String[] args) throws Exception {
108109
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
109110
try (DaprClient client = new DaprClientBuilder().build()) {
@@ -114,28 +115,41 @@ public class Publisher {
114115
System.out.println("Published message: " + message);
115116
//...
116117
}
118+
///...
117119
}
118120
}
119-
///...
120121
}
121122
```
122123

123-
This example also pushes a non-string content event, the follow code in same `Publisher` main method publishes a bite:
124+
The `CloudEventPublisher.java` file shows how the same can be accomplished if the application must send a CloudEvent object instead of relying on Dapr's automatic CloudEvent "wrapping".
125+
In this case, the app MUST override the content-type parameter via `withContentType()`, so Dapr sidecar knows that the payload is already a CloudEvent object.
124126

125127
```java
126128
public class Publisher {
127-
///...
128-
public static void main(String[] args) throws Exception {
129-
///...
130-
//Publishing a single bite: Example of non-string based content published
131-
client.publishEvent(
132-
TOPIC_NAME,
133-
new byte[] { 1 },
134-
Collections.singletonMap("content-type", "application/octet-stream")).block();
135-
System.out.println("Published one byte.");
136-
System.out.println("Done.");
129+
///...
130+
public static void main(String[] args) throws Exception {
131+
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
132+
try (DaprClient client = new DaprClientBuilder().build()) {
133+
for (int i = 0; i < NUM_MESSAGES; i++) {
134+
CloudEvent cloudEvent = new CloudEvent();
135+
cloudEvent.setId(UUID.randomUUID().toString());
136+
cloudEvent.setType("example");
137+
cloudEvent.setSpecversion("1");
138+
cloudEvent.setDatacontenttype("text/plain");
139+
cloudEvent.setData(String.format("This is message #%d", i));
140+
141+
//Publishing messages
142+
client.publishEvent(
143+
new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
144+
.withContentType(CloudEvent.CONTENT_TYPE)
145+
.withMetadata(singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS))
146+
.build()).block();
147+
System.out.println("Published cloud event with message: " + cloudEvent.getData());
148+
//...
149+
}
150+
//...
151+
}
137152
}
138-
///...
139153
}
140154
```
141155

sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.dapr.client.domain.CloudEvent;
1313
import io.dapr.client.domain.HttpExtension;
1414
import io.dapr.client.domain.Metadata;
15+
import io.dapr.client.domain.PublishEventRequestBuilder;
1516
import io.dapr.it.BaseIT;
1617
import io.dapr.it.DaprRun;
1718
import io.dapr.serializer.DaprObjectSerializer;
@@ -26,6 +27,7 @@
2627
import java.util.Arrays;
2728
import java.util.Collection;
2829
import java.util.Collections;
30+
import java.util.LinkedHashMap;
2931
import java.util.List;
3032

3133
import static io.dapr.it.Retry.callWithRetry;
@@ -158,24 +160,45 @@ public String getContentType() {
158160
System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME));
159161
}
160162

163+
//Publishing an object.
164+
MyObject object = new MyObject();
165+
object.setId("123");
166+
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block();
167+
System.out.println("Published one object.");
168+
161169
//Publishing a single byte: Example of non-string based content published
162170
client.publishEvent(
163171
PUBSUB_NAME,
164172
TOPIC_NAME,
165173
new byte[]{1}).block();
166174
System.out.println("Published one byte.");
167175

176+
CloudEvent cloudEvent = new CloudEvent();
177+
cloudEvent.setId("1234");
178+
cloudEvent.setData("message from cloudevent");
179+
cloudEvent.setSource("test");
180+
cloudEvent.setSpecversion("1");
181+
cloudEvent.setType("myevent");
182+
cloudEvent.setDatacontenttype("text/plain");
183+
184+
//Publishing a cloud event.
185+
client.publishEvent(new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
186+
.withContentType("application/cloudevents+json")
187+
.build()).block();
188+
System.out.println("Published one cloud event.");
189+
168190
Thread.sleep(3000);
169191

170192
callWithRetry(() -> {
171193
System.out.println("Checking results for topic " + TOPIC_NAME);
194+
// Validate text payload.
172195
final List<CloudEvent> messages = client.invokeMethod(
173196
daprRun.getAppName(),
174197
"messages/testingtopic",
175198
null,
176199
HttpExtension.GET,
177200
CLOUD_EVENT_LIST_TYPE_REF).block();
178-
assertEquals(11, messages.size());
201+
assertEquals(13, messages.size());
179202
for (int i = 0; i < NUM_MESSAGES; i++) {
180203
final int messageId = i;
181204
assertTrue(messages
@@ -186,13 +209,30 @@ public String getContentType() {
186209
.count() == 1);
187210
}
188211

212+
// Validate object payload.
213+
assertTrue(messages
214+
.stream()
215+
.filter(m -> m.getData() != null)
216+
.filter(m -> m.getData() instanceof LinkedHashMap)
217+
.map(m -> (LinkedHashMap)m.getData())
218+
.filter(m -> "123".equals(m.get("id")))
219+
.count() == 1);
220+
221+
// Validate byte payload.
189222
assertTrue(messages
190223
.stream()
191224
.filter(m -> m.getData() != null)
192225
.map(m -> m.getData())
193226
.filter(m -> "AQ==".equals(m))
194227
.count() == 1);
195228

229+
// Validate cloudevent payload.
230+
assertTrue(messages
231+
.stream()
232+
.filter(m -> m.getData() != null)
233+
.map(m -> m.getData())
234+
.filter(m -> "message from cloudevent".equals(m))
235+
.count() == 1);
196236
}, 2000);
197237

198238
callWithRetry(() -> {
@@ -330,4 +370,16 @@ public void testPubSubTTLMetadata() throws Exception {
330370

331371
daprRun.stop();
332372
}
373+
374+
public static class MyObject {
375+
private String id;
376+
377+
public String getId() {
378+
return this.id;
379+
}
380+
381+
public void setId(String id) {
382+
this.id = id;
383+
}
384+
}
333385
}

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,15 @@
4040
import io.grpc.Metadata;
4141
import io.grpc.MethodDescriptor;
4242
import io.grpc.stub.StreamObserver;
43-
import okhttp3.HttpUrl;
4443
import reactor.core.publisher.Mono;
4544
import reactor.core.publisher.MonoSink;
4645
import reactor.util.context.Context;
4746

4847
import java.io.Closeable;
4948
import java.io.IOException;
5049
import java.util.Collections;
51-
import java.util.HashMap;
5250
import java.util.List;
5351
import java.util.Map;
54-
import java.util.Optional;
5552
import java.util.concurrent.ExecutionException;
5653
import java.util.function.Consumer;
5754
import java.util.stream.Collectors;
@@ -143,7 +140,15 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
143140
.setTopic(topic)
144141
.setPubsubName(pubsubName)
145142
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
146-
envelopeBuilder.setDataContentType(objectSerializer.getContentType());
143+
144+
// Content-type can be overwritten on a per-request basis.
145+
// It allows CloudEvents to be handled differently, for example.
146+
String contentType = request.getContentType();
147+
if (contentType == null || contentType.isEmpty()) {
148+
contentType = objectSerializer.getContentType();
149+
}
150+
envelopeBuilder.setDataContentType(contentType);
151+
147152
Map<String, String> metadata = request.getMetadata();
148153
if (metadata != null) {
149154
envelopeBuilder.putAllMetadata(metadata);

sdk/src/main/java/io/dapr/client/DaprClientHttp.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,13 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
137137
}
138138

139139
byte[] serializedEvent = objectSerializer.serialize(data);
140-
Map<String, String> headers = Collections.singletonMap("content-type", objectSerializer.getContentType());
140+
// Content-type can be overwritten on a per-request basis.
141+
// It allows CloudEvents to be handled differently, for example.
142+
String contentType = request.getContentType();
143+
if (contentType == null || contentType.isEmpty()) {
144+
contentType = objectSerializer.getContentType();
145+
}
146+
Map<String, String> headers = Collections.singletonMap("content-type", contentType);
141147

142148
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
143149

sdk/src/main/java/io/dapr/client/domain/CloudEvent.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.fasterxml.jackson.databind.DeserializationFeature;
1111
import com.fasterxml.jackson.databind.ObjectMapper;
1212

13-
import javax.annotation.PropertyKey;
1413
import java.io.IOException;
1514
import java.util.Arrays;
1615
import java.util.Objects;
@@ -20,6 +19,11 @@
2019
*/
2120
public final class CloudEvent {
2221

22+
/**
23+
* Mime type used for CloudEvent.
24+
*/
25+
public static final String CONTENT_TYPE = "application/cloudevents+json";
26+
2327
/**
2428
* Shared Json serializer/deserializer as per Jackson's documentation.
2529
*/

sdk/src/main/java/io/dapr/client/domain/PublishEventRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public class PublishEventRequest {
1818

1919
private Object data;
2020

21+
private String contentType;
22+
2123
private Map<String, String> metadata;
2224

2325
public String getPubsubName() {
@@ -44,6 +46,14 @@ void setData(Object data) {
4446
this.data = data;
4547
}
4648

49+
public String getContentType() {
50+
return this.contentType;
51+
}
52+
53+
void setContentType(String contentType) {
54+
this.contentType = contentType;
55+
}
56+
4757
public Map<String, String> getMetadata() {
4858
return metadata;
4959
}

sdk/src/main/java/io/dapr/client/domain/PublishEventRequestBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class PublishEventRequestBuilder {
2020

2121
private final Object data;
2222

23+
private String contentType;
24+
2325
private Map<String, String> metadata = new HashMap<>();
2426

2527
/**
@@ -34,6 +36,11 @@ public PublishEventRequestBuilder(String pubsubName, String topic, Object data)
3436
this.data = data;
3537
}
3638

39+
public PublishEventRequestBuilder withContentType(String contentType) {
40+
this.contentType = contentType;
41+
return this;
42+
}
43+
3744
public PublishEventRequestBuilder withMetadata(Map<String, String> metadata) {
3845
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
3946
return this;
@@ -48,6 +55,7 @@ public PublishEventRequest build() {
4855
request.setPubsubName(this.pubsubName);
4956
request.setTopic(this.topic);
5057
request.setData(this.data);
58+
request.setContentType(this.contentType);
5159
request.setMetadata(this.metadata);
5260
return request;
5361
}

0 commit comments

Comments
 (0)