Skip to content

Commit b02ee11

Browse files
RobinGGnormanmaurer
authored andcommitted
feat(example-mqtt): new MQTT heartBeat broker and client examples (netty#9336)
Motivation: Recently I'm going to build MQTT broker and client based on Netty. I had MQTT encoder and decoder founded, while no basic examples. So I'm going to share my simple heartBeat MQTT broker and client as an example. Modification: New MQTT heartBeat example under io.netty.example/mqtt/heartBeat/. Result: Client would send CONNECT and PINGREQ(heartBeat message). - CONNECT: once channel active - PINGREQ: once IdleStateEvent triggered, which is 20 seconds in this example Client would discard all messages it received. MQTT broker could handle CONNECT, PINGREQ and DISCONNECT messages. - CONNECT: send CONNACK back - PINGREQ: send PINGRESP back - DISCONNECT: close the channel Broker would close the channel if 2 heartBeat lost, which set to 45 seconds in this example.
1 parent 7fc355a commit b02ee11

File tree

5 files changed

+298
-0
lines changed

5 files changed

+298
-0
lines changed

example/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@
9898
<artifactId>netty-codec-stomp</artifactId>
9999
<version>${project.version}</version>
100100
</dependency>
101+
<dependency>
102+
<groupId>${project.groupId}</groupId>
103+
<artifactId>netty-codec-mqtt</artifactId>
104+
<version>${project.version}</version>
105+
</dependency>
101106

102107
<dependency>
103108
<groupId>com.google.protobuf</groupId>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2019 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.example.mqtt.heartBeat;
17+
18+
import io.netty.bootstrap.ServerBootstrap;
19+
import io.netty.channel.ChannelFuture;
20+
import io.netty.channel.ChannelInitializer;
21+
import io.netty.channel.ChannelOption;
22+
import io.netty.channel.EventLoopGroup;
23+
import io.netty.channel.nio.NioEventLoopGroup;
24+
import io.netty.channel.socket.SocketChannel;
25+
import io.netty.channel.socket.nio.NioServerSocketChannel;
26+
import io.netty.handler.codec.mqtt.MqttDecoder;
27+
import io.netty.handler.codec.mqtt.MqttEncoder;
28+
import io.netty.handler.timeout.IdleStateHandler;
29+
30+
import java.util.concurrent.TimeUnit;
31+
32+
public final class MqttHeartBeatBroker {
33+
34+
private MqttHeartBeatBroker() {
35+
}
36+
37+
public static void main(String[] args) throws Exception {
38+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
39+
EventLoopGroup workerGroup = new NioEventLoopGroup();
40+
41+
try {
42+
ServerBootstrap b = new ServerBootstrap();
43+
b.group(bossGroup, workerGroup);
44+
b.option(ChannelOption.SO_BACKLOG, 1024);
45+
b.channel(NioServerSocketChannel.class);
46+
b.childHandler(new ChannelInitializer<SocketChannel>() {
47+
protected void initChannel(SocketChannel ch) throws Exception {
48+
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
49+
ch.pipeline().addLast("decoder", new MqttDecoder());
50+
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(45, 0, 0, TimeUnit.SECONDS));
51+
ch.pipeline().addLast("handler", MqttHeartBeatBrokerHandler.INSTANCE);
52+
}
53+
});
54+
55+
ChannelFuture f = b.bind(1883).sync();
56+
System.out.println("Broker initiated...");
57+
58+
f.channel().closeFuture().sync();
59+
} finally {
60+
workerGroup.shutdownGracefully();
61+
bossGroup.shutdownGracefully();
62+
}
63+
}
64+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2019 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.example.mqtt.heartBeat;
17+
18+
import io.netty.channel.ChannelHandler.Sharable;
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelInboundHandlerAdapter;
21+
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
22+
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
23+
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
24+
import io.netty.handler.codec.mqtt.MqttFixedHeader;
25+
import io.netty.handler.codec.mqtt.MqttMessage;
26+
import io.netty.handler.codec.mqtt.MqttMessageType;
27+
import io.netty.handler.codec.mqtt.MqttQoS;
28+
import io.netty.handler.timeout.IdleState;
29+
import io.netty.handler.timeout.IdleStateEvent;
30+
import io.netty.util.ReferenceCountUtil;
31+
32+
@Sharable
33+
public final class MqttHeartBeatBrokerHandler extends ChannelInboundHandlerAdapter {
34+
35+
public static final MqttHeartBeatBrokerHandler INSTANCE = new MqttHeartBeatBrokerHandler();
36+
37+
private MqttHeartBeatBrokerHandler() {
38+
}
39+
40+
@Override
41+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
42+
MqttMessage mqttMessage = (MqttMessage) msg;
43+
System.out.println("Received MQTT message: " + mqttMessage);
44+
switch (mqttMessage.fixedHeader().messageType()) {
45+
case CONNECT:
46+
MqttFixedHeader connackFixedHeader =
47+
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
48+
MqttConnAckVariableHeader mqttConnAckVariableHeader =
49+
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
50+
MqttConnAckMessage connack = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader);
51+
ctx.writeAndFlush(connack);
52+
break;
53+
case PINGREQ:
54+
MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false,
55+
MqttQoS.AT_MOST_ONCE, false, 0);
56+
MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
57+
ctx.writeAndFlush(pingResp);
58+
break;
59+
case DISCONNECT:
60+
ctx.close();
61+
break;
62+
default:
63+
System.out.println("Unexpected message type: " + mqttMessage.fixedHeader().messageType());
64+
ReferenceCountUtil.release(msg);
65+
ctx.close();
66+
}
67+
}
68+
69+
@Override
70+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
71+
System.out.println("Channel heartBeat lost");
72+
if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent) evt).state()) {
73+
ctx.close();
74+
}
75+
}
76+
77+
@Override
78+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
79+
cause.printStackTrace();
80+
ctx.close();
81+
}
82+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2019 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.example.mqtt.heartBeat;
17+
18+
import io.netty.bootstrap.Bootstrap;
19+
import io.netty.channel.ChannelFuture;
20+
import io.netty.channel.ChannelInitializer;
21+
import io.netty.channel.EventLoopGroup;
22+
import io.netty.channel.nio.NioEventLoopGroup;
23+
import io.netty.channel.socket.SocketChannel;
24+
import io.netty.channel.socket.nio.NioSocketChannel;
25+
import io.netty.handler.codec.mqtt.MqttDecoder;
26+
import io.netty.handler.codec.mqtt.MqttEncoder;
27+
import io.netty.handler.timeout.IdleStateHandler;
28+
29+
import java.util.concurrent.TimeUnit;
30+
31+
public final class MqttHeartBeatClient {
32+
private MqttHeartBeatClient() {
33+
}
34+
35+
private static final String HOST = System.getProperty("host", "127.0.0.1");
36+
private static final int PORT = Integer.parseInt(System.getProperty("port", "1883"));
37+
private static final String CLIENT_ID = System.getProperty("clientId", "guestClient");
38+
private static final String USER_NAME = System.getProperty("userName", "guest");
39+
private static final String PASSWORD = System.getProperty("password", "guest");
40+
41+
public static void main(String[] args) throws Exception {
42+
EventLoopGroup workerGroup = new NioEventLoopGroup();
43+
44+
try {
45+
Bootstrap b = new Bootstrap();
46+
b.group(workerGroup);
47+
b.channel(NioSocketChannel.class);
48+
b.handler(new ChannelInitializer<SocketChannel>() {
49+
protected void initChannel(SocketChannel ch) throws Exception {
50+
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
51+
ch.pipeline().addLast("decoder", new MqttDecoder());
52+
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));
53+
ch.pipeline().addLast("handler", new MqttHeartBeatClientHandler(CLIENT_ID, USER_NAME, PASSWORD));
54+
}
55+
});
56+
57+
ChannelFuture f = b.connect(HOST, PORT).sync();
58+
System.out.println("Client connected");
59+
f.channel().closeFuture().sync();
60+
} finally {
61+
workerGroup.shutdownGracefully();
62+
}
63+
}
64+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2019 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.example.mqtt.heartBeat;
17+
18+
import io.netty.channel.ChannelHandlerContext;
19+
import io.netty.channel.ChannelInboundHandlerAdapter;
20+
import io.netty.handler.codec.mqtt.MqttConnectMessage;
21+
import io.netty.handler.codec.mqtt.MqttConnectPayload;
22+
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
23+
import io.netty.handler.codec.mqtt.MqttFixedHeader;
24+
import io.netty.handler.codec.mqtt.MqttMessage;
25+
import io.netty.handler.codec.mqtt.MqttMessageType;
26+
import io.netty.handler.codec.mqtt.MqttQoS;
27+
import io.netty.handler.timeout.IdleStateEvent;
28+
import io.netty.util.ReferenceCountUtil;
29+
30+
public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter {
31+
32+
private static final String PROTOCOL_NAME_MQTT_3_1_1 = "MQTT";
33+
private static final int PROTOCOL_VERSION_MQTT_3_1_1 = 4;
34+
35+
private final String clientId;
36+
private final String userName;
37+
private final byte[] password;
38+
39+
public MqttHeartBeatClientHandler(String clientId, String userName, String password) {
40+
this.clientId = clientId;
41+
this.userName = userName;
42+
this.password = password.getBytes();
43+
}
44+
45+
@Override
46+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
47+
// discard all messages
48+
ReferenceCountUtil.release(msg);
49+
}
50+
51+
@Override
52+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
53+
MqttFixedHeader connectFixedHeader =
54+
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
55+
MqttConnectVariableHeader connectVariableHeader =
56+
new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false,
57+
0, false, false, 20);
58+
MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password);
59+
MqttConnectMessage connectMessage =
60+
new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload);
61+
ctx.writeAndFlush(connectMessage);
62+
System.out.println("Sent CONNECT");
63+
}
64+
65+
@Override
66+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
67+
if (evt instanceof IdleStateEvent) {
68+
MqttFixedHeader pingreqFixedHeader =
69+
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
70+
MqttMessage pingreqMessage = new MqttMessage(pingreqFixedHeader);
71+
ctx.writeAndFlush(pingreqMessage);
72+
System.out.println("Sent PINGREQ");
73+
} else {
74+
super.userEventTriggered(ctx, evt);
75+
}
76+
}
77+
78+
@Override
79+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
80+
cause.printStackTrace();
81+
ctx.close();
82+
}
83+
}

0 commit comments

Comments
 (0)