http://www.cnblogs.com/yfliufei/p/4463538.html 2015 package io.mqtt.server;
import io.mqtt.tool.ConfigService; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Log4JLoggerFactory;
import java.util.ArrayList; import java.util.List;
public class Server { private static final InternalLogger logger = InternalLoggerFactory .getInstance(Server.class);
private int port; // private final int port = ConfigService.getIntProperty("tcp.port", 1883); private final int httpPort = ConfigService .getIntProperty("http.port", 8080);
private List<Channel> channels = new ArrayList<Channel>(); private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup();
public Server(int port) { this.port = port; }
private ServerBootstrap getDefaultServerBootstrap() { ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .option(ChannelOption.SO_BACKLOG, 1000) .option(ChannelOption.TCP_NODELAY, true) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_KEEPALIVE, true); return server; }
public ChannelFuture run() throws Exception { InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
Channel channle = getDefaultServerBootstrap() .childHandler(new TcpChannelInitializer()).bind(port).sync() .channel(); channels.add(channle);
logger.info("mqtt.io tcp server started at port " + port + '.');
ChannelFuture future = getDefaultServerBootstrap().childHandler( new HttpChannelInitializer()).bind(httpPort);
Channel httpChannel = future.sync().channel(); channels.add(httpChannel);
logger.info("mqtt.io websocket server started at port " + httpPort + '.');
return future; }
public void destroy() { logger.info("destroy mqtt.io server ..."); for (Channel channel : channels) { channel.close(); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
public static void main(String[] args) throws Exception { // for (int i = 0; i < 5; i++) { new ServerThread(65432 + (0 * 2)).start(); // } }
}
package io.mqtt.handler;
import io.mqtt.processer.ConnectProcesser; import io.mqtt.processer.DisConnectProcesser; import io.mqtt.processer.PingReqProcesser; import io.mqtt.processer.PublishProcesser; import io.mqtt.processer.SubscribeProcesser; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException;
import java.util.Collections; import java.util.HashMap; import java.util.Map; import io.mqtt.processer.*; import org.meqantt.message.ConnAckMessage; import org.meqantt.message.ConnAckMessage.ConnectionStatus; import org.meqantt.message.DisconnectMessage; import org.meqantt.message.Message; import org.meqantt.message.Message.Type; import org.meqantt.message.PingRespMessage;
public class MqttMessageHandler extends ChannelInboundHandlerAdapter { private static PingRespMessage PINGRESP = new PingRespMessage();
private static final Map<Message.Type, Processer> processers; static { Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>( 6);
map.put(Type.CONNECT, new ConnectProcesser()); map.put(Type.PUBLISH, new PublishProcesser()); map.put(Type.SUBSCRIBE, (Processer) new SubscribeProcesser()); map.put(Type.UNSUBSCRIBE, (Processer) new UnsubscribeProcesser()); map.put(Type.PINGREQ, new PingReqProcesser()); map.put(Type.DISCONNECT, (Processer) new DisConnectProcesser());
processers = Collections.unmodifiableMap(map); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { try { if (e.getCause() instanceof ReadTimeoutException) { ctx.write(PINGRESP).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); } else { ctx.channel().close(); } } catch (Throwable t) { t.printStackTrace(); ctx.channel().close(); }
e.printStackTrace(); }
@Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { //MQTT MESSAGE Message msg = (Message) obj; // server收到clinet 的MQTT数据包,并获取MQTT的消息类型 Processer p = processers.get(msg.getType()); if (p == null) { return; } //根据特定消息类型解析消息包 Message rmsg = p.proc(msg, ctx); if (rmsg == null) { return; } //根据消息处理结果,向clinet做出回应 if (rmsg instanceof ConnAckMessage && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else if (rmsg instanceof DisconnectMessage) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
//client
package com.test.client;
import org.eclipse.paho.client.mqttv3.*;
public class SubscribeMessage implements MqttCallback {
private MqttClient client;
public SubscribeMessage() { }
public static void main(String[] args) { // String tcpUrl = "tcp://127.0.0.1:1883"; // String clientId = "sub-msg/client1"; // String topicName = "sub/client1"; // // new SubscribeMessage().doDemo(tcpUrl, clientId, topicName); // for (int j = 0; j < 5; j++) { for (int i = 0; i < 10000; i++) { new SubscribeThread("client_" + 0 + i, "tcp://127.0.0.1:" + (65432 + 0 * 2)).start(); } // }
}
public void doDemo(String tcpUrl, String clientId, String topicName) { try { client = new MqttClient(tcpUrl, clientId); MqttConnectOptions mqcConf = new MqttConnectOptions(); mqcConf.setConnectionTimeout(300); mqcConf.setKeepAliveInterval(1000); client.connect(mqcConf); client.setCallback(this); client.subscribe(topicName); } catch (MqttException e) { e.printStackTrace(); } }
public void connectionLost(Throwable cause) { cause.printStackTrace(); }
public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("[GOT PUBLISH MESSAGE] : " + message); }
public void deliveryComplete(IMqttDeliveryToken token) { } }
|