1.MayiktThreadMQ.class package com.mqdemo.netty; import com.alibaba.fastjson.JSONObject; import java.util.concurrent.LinkedBlockingDeque; /** * @ClassName MayiktThreadMQ * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com * @Version V1.0 **/ public class MayiktThreadMQ { private static LinkedBlockingDeque<JSONObject> msgs = new LinkedBlockingDeque<JSONObject>(); public static void main(String[] args) { // 生产线程 Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { while (true) { Thread.sleep(1000); JSONObject data = new JSONObject(); data.put("userId", "1234"); // 存入消息 msgs.offer(data); } } catch (Exception e) { } } }, "生产者"); producerThread.start(); // 消费者线程 Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { while (true) { JSONObject data = msgs.poll(); if (data != null) { System.out.println(Thread.currentThread().getName() + "," + data); } } } catch (Exception e) { } } }, "消费者"); consumerThread.start(); } } 2.MayiktNettyMQServer.class package com.mqdemo.netty; import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.commons.lang3.StringUtils; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.concurrent.LinkedBlockingDeque; /** * @ClassName NettyMQServer2021 * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com * @Version V1.0 **/ public class MayiktNettyMQServer { public void bind(int port) throws Exception { /** * Netty 抽象出两组线程池BossGroup和WorkerGroup * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workerGroup) // 设定NioServerSocketChannel 为服务器端 .channel(NioServerSocketChannel.class) //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时, //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。 .option(ChannelOption.SO_BACKLOG, 100) // 服务器端监听数据回调Handler .childHandler(new MayiktNettyMQServer.ChildChannelHandler()); //绑定端口, 同步等待成功; ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("当前服务器端启动成功..."); //等待服务端监听端口关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //优雅关闭 线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 设置异步回调监听 ch.pipeline().addLast(new MayiktNettyMQServer.MayiktServerHandler()); } } public static void main(String[] args) throws Exception { int port = 9008; new MayiktNettyMQServer().bind(port); } private static final String type_consumer = "consumer"; private static final String type_producer = "producer"; private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>(); private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>(); // 生产者投递消息的:topicName public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> { /** * 服务器接收客户端请求 * * @param ctx * @param data * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object data) throws Exception { JSONObject clientMsg = getData(data); String type = clientMsg.getString("type"); switch (type) { case type_producer: producer(clientMsg); break; case type_consumer: consumer(ctx); break; } } private void consumer(ChannelHandlerContext ctx) { // 保存消费者连接 ctxs.add(ctx); // 主动拉取mq服务器端缓存中没有被消费的消息 String data = msgs.poll(); if (StringUtils.isEmpty(data)) { return; } // 将该消息发送给消费者 byte[] req = data.getBytes(); ByteBuf firstMSG = Unpooled.buffer(req.length); firstMSG.writeBytes(req); ctx.writeAndFlush(firstMSG); } private void producer(JSONObject clientMsg) { // 缓存生产者投递 消息 String msg = clientMsg.getString("msg"); msgs.offer(msg); //需要将该消息推送消费者 ctxs.forEach((ctx) -> { // 将该消息发送给消费者 String data = msgs.poll(); if (data == null) { return; } byte[] req = data.getBytes(); ByteBuf firstMSG = Unpooled.buffer(req.length); firstMSG.writeBytes(req); ctx.writeAndFlush(firstMSG); }); } private JSONObject getData(Object data) throws UnsupportedEncodingException { ByteBuf buf = (ByteBuf) data; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); return JSONObject.parseObject(body); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } } 3.MayiktNettyMQProducer.class package com.mqdemo.netty; import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @ClassName MayiktNettyMQProducer * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com * @Version V1.0 **/ public class MayiktNettyMQProducer { public void connect(int port, String host) throws Exception { //配置客户端NIO 线程组 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap client = new Bootstrap(); try { client.group(group) // 设置为Netty客户端 .channel(NioSocketChannel.class) /** * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。 * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。 * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。 */ .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MayiktNettyMQProducer.NettyClientHandler()); //// 1. 演示LineBasedFrameDecoder编码器 // ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // ch.pipeline().addLast(new StringDecoder()); } }); //绑定端口, 异步连接操作 ChannelFuture future = client.connect(host, port).sync(); //等待客户端连接端口关闭 future.channel().closeFuture().sync(); } finally { //优雅关闭 线程组 group.shutdownGracefully(); } } public static void main(String[] args) { int port = 9008; MayiktNettyMQProducer client = new MayiktNettyMQProducer(); try { client.connect(port, "127.0.0.1"); } catch (Exception e) { e.printStackTrace(); } } public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { JSONObject data = new JSONObject(); data.put("type", "producer"); JSONObject msg = new JSONObject(); msg.put("userId", "123456"); msg.put("age", "23"); data.put("msg", msg); // 生产发送数据 byte[] req = data.toJSONString().getBytes(); ByteBuf firstMSG = Unpooled.buffer(req.length); firstMSG.writeBytes(req); ctx.writeAndFlush(firstMSG); } /** * 客户端读取到服务器端数据 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("客户端接收到服务器端请求:" + body); } // tcp属于双向传输 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } } 4.MayiktNettyMQConsumer.class package com.mqdemo.netty; import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @ClassName MayiktNettyMQProducer * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com * @Version V1.0 **/ public class MayiktNettyMQConsumer { public void connect(int port, String host) throws Exception { //配置客户端NIO 线程组 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap client = new Bootstrap(); try { client.group(group) // 设置为Netty客户端 .channel(NioSocketChannel.class) /** * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。 * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。 * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。 */ .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MayiktNettyMQConsumer.NettyClientHandler()); //// 1. 演示LineBasedFrameDecoder编码器 // ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // ch.pipeline().addLast(new StringDecoder()); } }); //绑定端口, 异步连接操作 ChannelFuture future = client.connect(host, port).sync(); //等待客户端连接端口关闭 future.channel().closeFuture().sync(); } finally { //优雅关闭 线程组 group.shutdownGracefully(); } } public static void main(String[] args) { int port = 9008; MayiktNettyMQConsumer client = new MayiktNettyMQConsumer(); try { client.connect(port, "127.0.0.1"); } catch (Exception e) { e.printStackTrace(); } } public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { JSONObject data = new JSONObject(); data.put("type", "consumer"); // 生产发送数据 byte[] req = data.toJSONString().getBytes(); ByteBuf firstMSG = Unpooled.buffer(req.length); firstMSG.writeBytes(req); ctx.writeAndFlush(firstMSG); } /** * 客户端读取到服务器端数据 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("客户端接收到服务器端请求:" + body); } // tcp属于双向传输 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } } |
|