分享

mq-netty示例

 若生安饶 2023-09-13

1.MayiktThreadMQ.class

package com.mqdemo.netty;

import com.alibaba.fastjson.JSONObject;

import java.util.concurrent.LinkedBlockingDeque;

/**

 * @ClassName MayiktThreadMQ

 * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com

 * @Version V1.0

 **/

public class MayiktThreadMQ {

    private static LinkedBlockingDeque<JSONObject> msgs = new LinkedBlockingDeque<JSONObject>();

    public static void main(String[] args) {

        // 生产线程

        Thread producerThread = new Thread(new Runnable() {

            @Override

            public void run() {

                try {

                    while (true) {

                        Thread.sleep(1000);

                        JSONObject data = new JSONObject();

                        data.put("userId", "1234");

                        // 存入消息

                        msgs.offer(data);

                    }

                } catch (Exception e) {

                }

            }

        }, "生产者");

        producerThread.start();

        // 消费者线程

        Thread consumerThread  = new Thread(new Runnable() {

            @Override

            public void run() {

                try {

                    while (true) {

                        JSONObject data = msgs.poll();

                        if (data != null) {

                            System.out.println(Thread.currentThread().getName() + "," + data);

                        }

                    }

                } catch (Exception e) {

                }

            }

        }, "消费者");

        consumerThread.start();

    }

}

2.MayiktNettyMQServer.class

package com.mqdemo.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import org.apache.commons.lang3.StringUtils;

import java.io.UnsupportedEncodingException;

import java.util.ArrayList;

import java.util.concurrent.LinkedBlockingDeque;

/**

 * @ClassName NettyMQServer2021

 * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com

 * @Version V1.0

 **/

public class MayiktNettyMQServer {

    public void bind(int port) throws Exception {

        /**

         * Netty 抽象出两组线程池BossGroup和WorkerGroup

         * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。

         */

        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();

        try {

            bootstrap.group(bossGroup, workerGroup)

                    // 设定NioServerSocketChannel 为服务器端

                    .channel(NioServerSocketChannel.class)

                    //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,

                    //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。

                    .option(ChannelOption.SO_BACKLOG, 100)

                    // 服务器端监听数据回调Handler

                    .childHandler(new MayiktNettyMQServer.ChildChannelHandler());

            //绑定端口, 同步等待成功;

            ChannelFuture future = bootstrap.bind(port).sync();

            System.out.println("当前服务器端启动成功...");

            //等待服务端监听端口关闭

            future.channel().closeFuture().sync();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            //优雅关闭 线程组

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override

        protected void initChannel(SocketChannel ch) throws Exception {

            // 设置异步回调监听

            ch.pipeline().addLast(new MayiktNettyMQServer.MayiktServerHandler());

        }

    }

    public static void main(String[] args) throws Exception {

        int port = 9008;

        new MayiktNettyMQServer().bind(port);

    }

    private static final String type_consumer = "consumer";

    private static final String type_producer = "producer";

    private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();

    private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();

    // 生产者投递消息的:topicName

    public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {

        /**

         * 服务器接收客户端请求

         *

         * @param ctx

         * @param data

         * @throws Exception

         */

        @Override

        protected void channelRead0(ChannelHandlerContext ctx, Object data)

                throws Exception {

            JSONObject clientMsg = getData(data);

            String type = clientMsg.getString("type");

            switch (type) {

                case type_producer:

                    producer(clientMsg);

                    break;

                case type_consumer:

                    consumer(ctx);

                    break;

            }

        }

        private void consumer(ChannelHandlerContext ctx) {

            // 保存消费者连接

            ctxs.add(ctx);

            // 主动拉取mq服务器端缓存中没有被消费的消息

            String data = msgs.poll();

            if (StringUtils.isEmpty(data)) {

                return;

            }

            // 将该消息发送给消费者

            byte[] req = data.getBytes();

            ByteBuf firstMSG = Unpooled.buffer(req.length);

            firstMSG.writeBytes(req);

            ctx.writeAndFlush(firstMSG);

        }

        private void producer(JSONObject clientMsg) {

            // 缓存生产者投递 消息

            String msg = clientMsg.getString("msg");

            msgs.offer(msg);

            //需要将该消息推送消费者

            ctxs.forEach((ctx) -> {

                // 将该消息发送给消费者

                String data = msgs.poll();

                if (data == null) {

                    return;

                }

                byte[] req = data.getBytes();

                ByteBuf firstMSG = Unpooled.buffer(req.length);

                firstMSG.writeBytes(req);

                ctx.writeAndFlush(firstMSG);

            });

        }

        private JSONObject getData(Object data) throws UnsupportedEncodingException {

            ByteBuf buf = (ByteBuf) data;

            byte[] req = new byte[buf.readableBytes()];

            buf.readBytes(req);

            String body = new String(req, "UTF-8");

            return JSONObject.parseObject(body);

        }

        @Override

        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

            ctx.flush();

        }

        @Override

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

                throws Exception {

            ctx.close();

        }

    }

}


3.MayiktNettyMQProducer.class

package com.mqdemo.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

 * @ClassName MayiktNettyMQProducer

 * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com

 * @Version V1.0

 **/

public class MayiktNettyMQProducer {

    public void connect(int port, String host) throws Exception {

        //配置客户端NIO 线程组

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap client = new Bootstrap();

        try {

            client.group(group)

                    // 设置为Netty客户端

                    .channel(NioSocketChannel.class)

                    /**

                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。

                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。

                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

                     */

                    .option(ChannelOption.TCP_NODELAY, true)

                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new MayiktNettyMQProducer.NettyClientHandler());

////                            1. 演示LineBasedFrameDecoder编码器

//                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

//                            ch.pipeline().addLast(new StringDecoder());

                        }

                    });

            //绑定端口, 异步连接操作

            ChannelFuture future = client.connect(host, port).sync();

            //等待客户端连接端口关闭

            future.channel().closeFuture().sync();

        } finally {

            //优雅关闭 线程组

            group.shutdownGracefully();

        }

    }

    public static void main(String[] args) {

        int port = 9008;

        MayiktNettyMQProducer client = new MayiktNettyMQProducer();

        try {

            client.connect(port, "127.0.0.1");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {

        @Override

        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();

            data.put("type", "producer");

            JSONObject msg = new JSONObject();

            msg.put("userId", "123456");

            msg.put("age", "23");

            data.put("msg", msg);

            // 生产发送数据

            byte[] req = data.toJSONString().getBytes();

            ByteBuf firstMSG = Unpooled.buffer(req.length);

            firstMSG.writeBytes(req);

            ctx.writeAndFlush(firstMSG);

        }

        /**

         * 客户端读取到服务器端数据

         *

         * @param ctx

         * @param msg

         * @throws Exception

         */

        @Override

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            ByteBuf buf = (ByteBuf) msg;

            byte[] req = new byte[buf.readableBytes()];

            buf.readBytes(req);

            String body = new String(req, "UTF-8");

            System.out.println("客户端接收到服务器端请求:" + body);

        }

        // tcp属于双向传输

        @Override

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

            ctx.close();

        }

    }

}


4.MayiktNettyMQConsumer.class

package com.mqdemo.netty;

import com.alibaba.fastjson.JSONObject;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

 * @ClassName MayiktNettyMQProducer

 * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com

 * @Version V1.0

 **/

public class MayiktNettyMQConsumer {

    public void connect(int port, String host) throws Exception {

        //配置客户端NIO 线程组

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap client = new Bootstrap();

        try {

            client.group(group)

                    // 设置为Netty客户端

                    .channel(NioSocketChannel.class)

                    /**

                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。

                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。

                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

                     */

                    .option(ChannelOption.TCP_NODELAY, true)

                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new MayiktNettyMQConsumer.NettyClientHandler());

////                            1. 演示LineBasedFrameDecoder编码器

//                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

//                            ch.pipeline().addLast(new StringDecoder());

                        }

                    });

            //绑定端口, 异步连接操作

            ChannelFuture future = client.connect(host, port).sync();

            //等待客户端连接端口关闭

            future.channel().closeFuture().sync();

        } finally {

            //优雅关闭 线程组

            group.shutdownGracefully();

        }

    }

    public static void main(String[] args) {

        int port = 9008;

        MayiktNettyMQConsumer client = new MayiktNettyMQConsumer();

        try {

            client.connect(port, "127.0.0.1");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {

        @Override

        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();

            data.put("type", "consumer");

            // 生产发送数据

            byte[] req = data.toJSONString().getBytes();

            ByteBuf firstMSG = Unpooled.buffer(req.length);

            firstMSG.writeBytes(req);

            ctx.writeAndFlush(firstMSG);

        }

        /**

         * 客户端读取到服务器端数据

         *

         * @param ctx

         * @param msg

         * @throws Exception

         */

        @Override

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            ByteBuf buf = (ByteBuf) msg;

            byte[] req = new byte[buf.readableBytes()];

            buf.readBytes(req);

            String body = new String(req, "UTF-8");

            System.out.println("客户端接收到服务器端请求:" + body);

        }

        // tcp属于双向传输

        @Override

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

            ctx.close();

        }

    }

}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多