Skip to content

Commit 73f2071

Browse files
zhangjidi2016zhangjidi2016
andauthored
[ISSUE apache#2219] Add some asynchronous API for batch messages (apache#2315)
* [ISSUE apache#2219] Add asynchronous batch send method. * modify the ut Co-authored-by: zhangjidi2016 <[email protected]>
1 parent 742ba50 commit 73f2071

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,29 @@ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
916916
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
917917
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
918918
}
919+
920+
@Override
921+
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
922+
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
923+
}
924+
925+
@Override
926+
public void send(Collection<Message> msgs, SendCallback sendCallback,
927+
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
928+
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
929+
}
930+
931+
@Override
932+
public void send(Collection<Message> msgs, MessageQueue mq,
933+
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
934+
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
935+
}
936+
937+
@Override
938+
public void send(Collection<Message> msgs, MessageQueue mq,
939+
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
940+
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
941+
}
919942

920943
/**
921944
* Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link

client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,19 @@ SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQ
9999

100100
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
101101
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
102-
102+
103+
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
104+
InterruptedException;
105+
106+
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
107+
MQBrokerException, InterruptedException;
108+
109+
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
110+
MQBrokerException, InterruptedException;
111+
112+
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
113+
RemotingException, MQBrokerException, InterruptedException;
114+
103115
//for rpc
104116
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
105117
RemotingException, MQBrokerException, InterruptedException;

client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,51 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
232232
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
233233
assertThat(cc.get()).isEqualTo(5);
234234
}
235+
236+
@Test
237+
public void testBatchSendMessageAsync()
238+
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
239+
final AtomicInteger cc = new AtomicInteger(0);
240+
final CountDownLatch countDownLatch = new CountDownLatch(4);
241+
242+
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
243+
SendCallback sendCallback = new SendCallback() {
244+
@Override
245+
public void onSuccess(SendResult sendResult) {
246+
countDownLatch.countDown();
247+
}
248+
249+
@Override
250+
public void onException(Throwable e) {
251+
e.printStackTrace();
252+
cc.incrementAndGet();
253+
countDownLatch.countDown();
254+
}
255+
};
256+
MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
257+
@Override
258+
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
259+
return null;
260+
}
261+
};
262+
263+
List<Message> msgs = new ArrayList<>();
264+
for (int i = 0; i < 5; i++) {
265+
Message message = new Message();
266+
message.setTopic("test");
267+
message.setBody(("hello world" + i).getBytes());
268+
msgs.add(message);
269+
}
270+
producer.send(msgs, sendCallback);
271+
producer.send(msgs, sendCallback, 1000);
272+
MessageQueue mq = new MessageQueue("test", "BrokerA", 1);
273+
producer.send(msgs, mq, sendCallback);
274+
// this message is send failed
275+
producer.send(msgs, new MessageQueue(), sendCallback, 1000);
276+
277+
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
278+
assertThat(cc.get()).isEqualTo(1);
279+
}
235280

236281
@Test
237282
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {

0 commit comments

Comments
 (0)