这篇文章起了一个很牛b的名字,实际要讲的内容很简单。但是还是发现很多人把这个功能写复杂了。netty的服务端网络编程,按照官方提供的demo,稍加修改即可,但是一些参数选项,需要自己去完善设置。而数据分发功能,就是面向所有连接分发数据,很多人的做法是使用java concurrent包下的相关容器保存连接,然后需要分发数据时,遍历集合中的元素,一个一个的调用writeAndFlush()将数据发出去。其实可以更简单一些,使用线程安全的ChannelGroup保存连接并分发数据。性能提高多少,我没有对比过,至少代码看着简化了。下面是相关实现代码,实际生产环境中,下面代码应该还有很多很多需要优化的地方,目前我还没有在超过10k并发连接情况下测试过下面的代码。 public class NetServer implements Runnable { private static Logger LOGGER = LoggerFactory.getLogger(NetServer.class); private final static int PORT = 9500; private final static int MAX_MESSAGE_LENGTH = 8192; private final ChannelGroup channels_ = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public NetServer() { } // 分发数据 public void dispatcher(String message) { channels_.writeAndFlush(message); } @Override public void run() { startup(); } private void startup() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.option(ChannelOption.SO_BACKLOG, 128); b.option(ChannelOption.TCP_NODELAY, true); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LengthFieldBasedFrameDecoder(MAX_MESSAGE_LENGTH, 0, 4, 0 ,4)); p.addLast(new LengthFieldPrepender(4)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new NetServerHandler(channels_)); } }); ChannelFuture future = b.bind(PORT).sync(); future.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if (future.isSuccess()) { LOGGER.info("服务器启动成功..."); } else { LOGGER.info("服务器启动失败..."); if (future.cause() != null) { LOGGER.error("异常信息: " + future.cause().getMessage()); } } } }); future.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.error("服务器启动出现异常..." + e.getMessage()); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class NetServerHandler extends SimpleChannelInboundHandler<String> { private ChannelGroup channels_; private static Logger LOGGER = LoggerFactory.getLogger(NetServerHandler.class); public NetServerHandler(ChannelGroup channels) { channels_ = channels; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels_.add(ctx.channel()); LOGGER.info("{} is up...当前连接数量: {}", ctx.channel().remoteAddress(), channels_.size()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.close(); LOGGER.info("{} is down...当前连接数量: {}", ctx.channel().remoteAddress(), channels_.size()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); LOGGER.error("与客户端 {} 连接出现异常, 异常信息: " + cause.getMessage(), ctx.channel().remoteAddress()); } } --------------------- 作者:grafx 来源:CSDN 原文:https://blog.csdn.net/grafx/article/details/56677667 版权声明:本文为博主原创文章,转载请附上博文链接! |
|