Skip to content

Commit f7dcf76

Browse files
author
Long Miao
committed
fix publish2 bug and improve deal logic for mqtt expand message
1 parent 84e69a1 commit f7dcf76

File tree

6 files changed

+139
-151
lines changed

6 files changed

+139
-151
lines changed

src/YunBaDemo.java

Lines changed: 58 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.codehaus.jettison.json.JSONObject;
44
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
55
import org.eclipse.paho.client.mqttv3.IMqttToken;
6+
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
67
import org.eclipse.paho.client.mqttv3.MqttException;
78

89
import com.google.common.eventbus.Subscribe;
@@ -11,11 +12,11 @@
1112
import io.yunba.java.manager.YunBaManager;
1213

1314
public class YunBaDemo {
14-
public static String ALIAS = "51595de01b2943499227e62713fb49ee_9cc8b78f9bf847e0aa1512f347f03bbc";
15-
15+
public static String ALIAS = "self_alias";
16+
1617
public static void main(String[] args) {
1718
// 初始化 YunBa SDK
18-
YunBaManager.start("56a0a88c4407a3cd028ac2fe");
19+
YunBaManager.start("58d49dad15a25f8920eaccda");
1920

2021
YunBaManager.getEventBus().register(new Object() {
2122

@@ -42,27 +43,6 @@ public void listen(MessageArrivedEvent event) {
4243
}
4344
});
4445

45-
// 订阅一个频道(Topic),以接收来自频道的消息
46-
YunBaManager.subscribe("bullet", new IMqttActionListener() {
47-
48-
@Override
49-
public void onSuccess(IMqttToken asyncActionToken) {
50-
System.out.println("mqtt succeed subscribe: "
51-
+ join(asyncActionToken.getTopics(), ","));
52-
}
53-
54-
@Override
55-
public void onFailure(IMqttToken asyncActionToken,
56-
Throwable exception) {
57-
if (exception instanceof MqttException) {
58-
MqttException ex = (MqttException) exception;
59-
System.err
60-
.println("subscribe failed with the error code = "
61-
+ ex.getReasonCode());
62-
}
63-
}
64-
});
65-
6646
// 订阅一个频道(Topic),以接收来自频道的消息
6747
YunBaManager.subscribe("like", new IMqttActionListener() {
6848

@@ -83,29 +63,9 @@ public void onFailure(IMqttToken asyncActionToken,
8363
}
8464
}
8565
});
86-
87-
// 订阅一个频道(Topic),以接收来自频道的消息
88-
YunBaManager.subscribe("stat", new IMqttActionListener() {
89-
90-
@Override
91-
public void onSuccess(IMqttToken asyncActionToken) {
92-
System.out.println("mqtt succeed subscribe: "
93-
+ join(asyncActionToken.getTopics(), ","));
94-
}
95-
96-
@Override
97-
public void onFailure(IMqttToken asyncActionToken,
98-
Throwable exception) {
99-
if (exception instanceof MqttException) {
100-
MqttException ex = (MqttException) exception;
101-
System.err
102-
.println("subscribe failed with the error code = "
103-
+ ex.getReasonCode());
104-
}
105-
}
106-
});
66+
10767
// 向 Topic 发送消息
108-
YunBaManager.publish("bullet", "like", new IMqttActionListener() {
68+
YunBaManager.publish("like", "publis message", new IMqttActionListener() {
10969

11070
@Override
11171
public void onSuccess(IMqttToken asyncActionToken) {
@@ -122,7 +82,7 @@ public void onFailure(IMqttToken asyncActionToken,
12282
}
12383
}
12484
});
125-
//
85+
12686
// YunBaManager.unsubscribe("test_topic", new IMqttActionListener() {
12787
//
12888
// @Override
@@ -149,6 +109,23 @@ public void onFailure(IMqttToken asyncActionToken,
149109
@Override
150110
public void onSuccess(IMqttToken asyncActionToken) {
151111
System.out.println("mqtt setAlias success");
112+
113+
// 向用户别名对象发送消息,用于实现点对点的消息发送
114+
YunBaManager.publishToAlias(ALIAS, "msg to java_alaias", new IMqttActionListener() {
115+
116+
@Override
117+
public void onSuccess(IMqttToken asyncActionToken) {
118+
// TODO Auto-generated method stub
119+
System.out.println("publishToAlias success");
120+
121+
}
122+
123+
@Override
124+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
125+
// TODO Auto-generated method stub
126+
System.err.println(exception.toString());
127+
}
128+
});
152129
}
153130

154131
@Override
@@ -167,7 +144,7 @@ public void onFailure(IMqttToken asyncActionToken,
167144

168145
@Override
169146
public void onSuccess(IMqttToken asyncActionToken) {
170-
System.out.println("mqtt get alias = "
147+
System.err.println("mqtt get alias = "
171148
+ asyncActionToken.getAlias());
172149
}
173150

@@ -179,11 +156,32 @@ public void onFailure(IMqttToken asyncActionToken,
179156
+ mqtt.getReasonCode());
180157
}
181158
});
159+
160+
//
161+
// 使用publish2发布消息
162+
JSONObject optsJson = new JSONObject();
163+
try {
164+
optsJson.put("qos", 2);
165+
} catch (JSONException e) {
166+
e.printStackTrace();
167+
}
168+
169+
YunBaManager.publish2("like", "publish2 message", new JSONObject(), new IMqttActionListener() {
170+
171+
@Override
172+
public void onSuccess(IMqttToken asyncActionToken) {
173+
// TODO Auto-generated method stub
174+
System.out.println("publish2 success");
175+
}
176+
177+
@Override
178+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
179+
// TODO Auto-generated method stub
180+
System.out.println(exception.toString());
181+
}
182+
});
182183

183-
// // 向用户别名对象发送消息,用于实现点对点的消息发送
184-
// YunBaManager.publishToAlias("java_alias", "msg to java_alaias", null);
185-
//
186-
// // 查询当前用户订阅的频道列表
184+
//// // 查询当前用户订阅的频道列表
187185
// YunBaManager.getTopicList("java_alias", new IMqttActionListener() {
188186
//
189187
// @Override
@@ -201,9 +199,9 @@ public void onFailure(IMqttToken asyncActionToken,
201199
// }
202200
// }
203201
// });
204-
//
205-
// // 根据 别名 来获取用户的状态
206-
// YunBaManager.getState("java_alias", new IMqttActionListener() {
202+
////
203+
//// // 根据 别名 来获取用户的状态
204+
// YunBaManager.getState(ALIAS, new IMqttActionListener() {
207205
//
208206
// @Override
209207
// public void onSuccess(IMqttToken asyncActionToken) {
@@ -220,12 +218,12 @@ public void onFailure(IMqttToken asyncActionToken,
220218
// @Override
221219
// public void onFailure(IMqttToken asyncActionToken,
222220
// Throwable exception) {
223-
// System.err.println("mqtt get state failed");
221+
// System.err.println("mqtt get state failed: " + exception.toString());
224222
// }
225223
// });
226224
//
227-
// // 获取输入 Topic 下面所有订阅用户的 别名
228-
// YunBaManager.getAliasList("test_topic", new IMqttActionListener() {
225+
//// // 获取输入 Topic 下面所有订阅用户的 别名
226+
// YunBaManager.getAliasList("bullet", new IMqttActionListener() {
229227
//
230228
// @Override
231229
// public void onSuccess(IMqttToken asyncActionToken) {
@@ -235,7 +233,7 @@ public void onFailure(IMqttToken asyncActionToken,
235233
// System.out.println("mqtt getAliasList: "
236234
// + topics.toString());
237235
// } catch (JSONException e) {
238-
//
236+
// e.printStackTrace();
239237
// }
240238
// }
241239
//
@@ -251,7 +249,7 @@ public void onFailure(IMqttToken asyncActionToken,
251249
// }
252250
// });
253251
//
254-
// // 订阅某个频道上的用户的上、下线 及 订阅(或取消订阅)该频道的事件通知
252+
//// // 订阅某个频道上的用户的上、下线 及 订阅(或取消订阅)该频道的事件通知
255253
// YunBaManager.subscribePresence("test_topic", new IMqttActionListener() {
256254
//
257255
// @Override

src/io/yunba/java/core/MQTTStack.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -529,10 +529,7 @@ protected TimerTask getExpandTimeOutTask(final MQTTMessage msg) {
529529

530530
@Override
531531
public void run() {
532-
if (msg.callbackId != -1
533-
&& YunBaManager.getTagAliasCallback(msg.callbackId) != null) {
534-
handleTimeOut(HANDLER_EXPAND_TIMEOUT, msg);
535-
}
532+
handleTimeOut(HANDLER_EXPAND_TIMEOUT, msg);
536533
}
537534
};
538535
}
@@ -669,9 +666,29 @@ public void run() {
669666
@Override
670667
public void expand(MQTTMessage msg) {
671668
final TimerTask timeoutTask = getExpandTimeOutTask(msg);
669+
final IMqttActionListener originCallback = msg.callback;
672670
mTimer.schedule(timeoutTask, CALLBACK_TIMEOUT);
673671
try {
674-
mMqttClient.expand(msg);
672+
mMqttClient.expand(msg, new IMqttActionListener() {
673+
674+
@Override
675+
public void onSuccess(IMqttToken asyncActionToken) {
676+
// TODO Auto-generated method stub
677+
timeoutTask.cancel();
678+
if (originCallback != null) {
679+
originCallback.onSuccess(asyncActionToken);
680+
}
681+
}
682+
683+
@Override
684+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
685+
// TODO Auto-generated method stub
686+
timeoutTask.cancel();
687+
if (originCallback != null) {
688+
originCallback.onFailure(asyncActionToken, exception);
689+
}
690+
}
691+
});
675692
} catch (Exception e) {
676693
}
677694
}
@@ -785,7 +802,6 @@ public void run() {
785802
}
786803

787804
public void ping(boolean isForcePing) {
788-
System.out.println("####################### ping");
789805
try {
790806
// no need to ping, Because it have succeed recently
791807
if (!isForcePing

src/io/yunba/java/manager/YunBaManager.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ public class YunBaManager {
3232
public static final String LAST_SUB = "last_sub";
3333
public static final String HISTORY_TOPICS = "history_topics";
3434

35-
private static int callBackId = 0;
36-
public static ConcurrentHashMap<Integer, IMqttActionListener> callbacks = new ConcurrentHashMap<Integer, IMqttActionListener>();
37-
3835
private static ExecutorService executor = Executors.newSingleThreadExecutor();
3936
private static MQTTStack mMqttStack;
4037
public static String AppKey = null;
@@ -58,18 +55,6 @@ public static void start() {
5855
mMqttStack.start();
5956
}
6057

61-
public static IMqttActionListener getTagAliasCallback(int seqId) {
62-
try {
63-
return callbacks.remove(seqId);
64-
} catch (Exception e) {
65-
return null;
66-
}
67-
}
68-
69-
private synchronized static int getCallbackID() {
70-
return callBackId++;
71-
}
72-
7358
public static EventBus getEventBus() {
7459
if (null == mEventBus) {
7560
synchronized (YunBaManager.class) {
@@ -184,6 +169,7 @@ public void run() {
184169
}
185170
MQTTMessage cache = new MQTTMessage(MQTTMessage.TYPE_EXPAND, topic, message, 1,
186171
opts, -1, mqttAction);
172+
cache.EXPAND_COMMNAD = MqttExpand.CMD_PUBLISH;
187173
mMqttStack.addMQTTMessage(cache);
188174
}
189175

@@ -409,9 +395,7 @@ private static void getStatusByAliasV2(String topic,
409395
private static class MQTTMessageFactory {
410396
public static MQTTMessage getExpandMessgae(byte expandCommand, String topic, String msg, int qos, Object userContent,
411397
IMqttActionListener listener) {
412-
int serialId = getCallbackID();
413-
callbacks.put(serialId, listener);
414-
MQTTMessage res = new MQTTMessage(MQTTMessage.TYPE_EXPAND, topic, msg, qos, userContent, serialId, listener);
398+
MQTTMessage res = new MQTTMessage(MQTTMessage.TYPE_EXPAND, topic, msg, qos, userContent, -1, listener);
415399
res.EXPAND_COMMNAD = expandCommand;
416400
return res;
417401
}

src/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1018,8 +1018,10 @@ public MqttToken ping(IMqttActionListener callback) throws MqttException {
10181018

10191019
}
10201020

1021-
public MqttToken expand(MQTTMessage cache) throws Exception {
1021+
public MqttToken expand(MQTTMessage cache, IMqttActionListener callback) throws Exception {
10221022
MqttToken token = new MqttToken(getClientId());
1023+
token.setActionCallback(callback);
1024+
10231025
MqttExpand expand ;
10241026
switch (cache.EXPAND_COMMNAD) {
10251027
case MqttExpand.CMD_PUBLISH:
@@ -1078,6 +1080,7 @@ public void getAliassByTopic(String topic, IMqttActionListener callback)
10781080
throws MqttException {
10791081

10801082
MqttToken token = new MqttToken(getClientId());
1083+
token.setActionCallback(callback);
10811084
MqttExpand expand = new MqttExpand(topic, MqttExpand.CMD_GET_ALIASLIST);
10821085
if (null != callback) {
10831086
int id = getCallbackID();
@@ -1092,6 +1095,7 @@ public void publish2(String topic, String msg, JSONObject opts,
10921095
final IMqttActionListener callback) throws Exception {
10931096

10941097
MqttToken token = new MqttToken(getClientId());
1098+
token.setActionCallback(callback);
10951099
MqttExpand expand = new MqttExpandPublish(topic, msg,
10961100
MqttExpand.CMD_PUBLISH, opts);
10971101
if (null != callback) {
@@ -1106,6 +1110,7 @@ public void publish2(String topic, String msg, JSONObject opts,
11061110
public void getStatusByAlias(String topic, IMqttActionListener callback)
11071111
throws MqttException {
11081112
MqttToken token = new MqttToken(getClientId());
1113+
token.setActionCallback(callback);
11091114
MqttExpand expand = new MqttExpand(topic, MqttExpand.CMD_GET_STATUS);
11101115
if (null != callback) {
11111116
int id = getCallbackID();
@@ -1119,6 +1124,7 @@ public void getStatusByAlias(String topic, IMqttActionListener callback)
11191124
public void getTopics(String alias, IMqttActionListener callback)
11201125
throws MqttException {
11211126
MqttToken token = new MqttToken(getClientId());
1127+
token.setActionCallback(callback);
11221128
MqttExpand expand = new MqttExpand(alias, MqttExpand.CMD_GET_TOPIC);
11231129
if (null != callback) {
11241130
int id = getCallbackID();
@@ -1131,6 +1137,7 @@ public void getTopics(String alias, IMqttActionListener callback)
11311137

11321138
public void getAlias(IMqttActionListener callback) throws MqttException {
11331139
MqttToken token = new MqttToken(getClientId());
1140+
token.setActionCallback(callback);
11341141
MqttExpand expand = new MqttExpand(null, MqttExpand.CMD_GET_ALIAS);
11351142
if (null != callback) {
11361143
int id = getCallbackID();

0 commit comments

Comments
 (0)