分享

【Redis】jedis客户端实现redis消息的发布订阅(实时消息中间件)

 WindySky 2017-10-17

发布

  1. package com.chiwei.redis;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.InputStreamReader;  
  5.   
  6. import org.slf4j.Logger;  
  7. import org.slf4j.LoggerFactory;  
  8.   
  9. import redis.clients.jedis.Jedis;  
  10.   
  11. public class RedisPublisher {  
  12.   
  13.     private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);  
  14.   
  15.     private final Jedis pubJedis;  
  16.   
  17.     private final String[] channel;  
  18.   
  19.     public RedisPublisher(Jedis pubJedis, String[] channel) {  
  20.         this.pubJedis = pubJedis;  
  21.         this.channel = channel;  
  22.     }  
  23.   
  24.     public void start() {  
  25.         log.debug("Type your message (type quit to exit)");  
  26.         int channelLen = channel.length;  
  27.         try {  
  28.             BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
  29.             while (true) {  
  30.                 String line = br.readLine();  
  31.                 if (!"quit".equals(line)) {  
  32.                     for (int i = 0; i < channelLen; i++) {  
  33.                         if (channel[i].matches("^chiwei.*")) {  
  34.                             log.debug("Match...");  
  35.                             pubJedis.publish(channel[i], line + "haha");  
  36.                         } else {  
  37.                             pubJedis.publish(channel[i], line);  
  38.                         }  
  39.                         log.debug("Publish to {}", channel[i]);  
  40.                     }  
  41.                 } else {  
  42.                     break;  
  43.                 }  
  44.             }  
  45.         } catch (Exception e) {  
  46.             log.error("IO fail while reading input", e);  
  47.         }  
  48.     }  
  49. }  


以上发布类,发布的频道是一个数组,即同时将内容发布到多个频道中,你可以根据内容去判断,不同的内容发布到不同的频道中。

订阅

  1. package com.chiwei.redis;  
  2.   
  3.   
  4. import org.slf4j.Logger;  
  5. import org.slf4j.LoggerFactory;  
  6.   
  7. import redis.clients.jedis.JedisPubSub;  
  8.   
  9. public class RedisSubscriber extends JedisPubSub{  
  10.   
  11.     private static final Logger log = LoggerFactory.getLogger(RedisSubscriber.class);  
  12.   
  13.     //取得订阅的消息后的处理  
  14.     public void onMessage(String s, String s1) {  
  15.         // TODO Auto-generated method stub  
  16.         log.debug("Message received,Channel:{},Msg:{}",s,s1);  
  17.     }  
  18.   
  19.     //取得按表达式的方式订阅的消息后的处理  
  20.     public void onPMessage(String s, String s1, String s2) {  
  21.         // TODO Auto-generated method stub  
  22.         log.debug("Pattern:{}",s);  
  23.         log.debug("Pattern Message received,Channel:{},Msg:{}",s1,s2);  
  24.     }  
  25.   
  26.     //初始化按表达式的方式订阅时候的处理  
  27.     public void onPSubscribe(String s, int i) {  
  28.         // TODO Auto-generated method stub  
  29.         log.debug("Pattern Subscribe,Pattern:{},ChannelNum:{}",s,i);  
  30.     }  
  31.   
  32.     //取消按表达式的方式订阅时候的处理  
  33.     public void onPUnsubscribe(String s, int i) {  
  34.         // TODO Auto-generated method stub  
  35.         log.debug("Pattern Unsubscribe,Pattern:{},ChannelNum:{}",s,i);  
  36.     }  
  37.   
  38.     //初始化订阅时候的处理  
  39.     public void onSubscribe(String s, int i) {  
  40.         // TODO Auto-generated method stub  
  41.         log.debug("Subscribe,Channel:{},ChannelNum:{}",s,i);  
  42.     }  
  43.   
  44.     //取消订阅时候的处理  
  45.     public void onUnsubscribe(String s, int i) {  
  46.         // TODO Auto-generated method stub  
  47.         log.debug("Unsubscribe,Channel:{},ChannelNum:{}",s,i);  
  48.     }  
  49.   
  50. }  


该类就是订阅的实现类,对于订阅的各项操作实现具体的处理方法。

启动主类

  1. package com.chiwei.redis;  
  2.   
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5.   
  6. import redis.clients.jedis.Jedis;  
  7. import redis.clients.jedis.JedisPool;  
  8. import redis.clients.jedis.JedisPoolConfig;  
  9.   
  10. public class RedisPubSubMain {  
  11.   
  12.     public static final String[] CHANNEL_NAME = new String[] { "chiwei.momo", "chiwei.nono","taotao"};  
  13.   
  14.     private static final Logger log = LoggerFactory.getLogger(RedisPubSubMain.class);  
  15.   
  16.     public static void main(String[] args) {  
  17.         // TODO Auto-generated method stub  
  18.         log.debug("=========================");  
  19.         JedisPoolConfig config = new JedisPoolConfig();  
  20.         config = new JedisPoolConfig();  
  21.         config.setMaxTotal(100);  
  22.         config.setMaxIdle(10);  
  23.         config.setMaxWaitMillis(1000L);  
  24.         config.setTestOnBorrow(true);  
  25.         config.setTestOnReturn(true);  
  26.         JedisPool jedisPool = new JedisPool(config, "192.168.11.176", 7379);  
  27.         final Jedis subJedis = jedisPool.getResource();  
  28.         final RedisSubscriber sub = new RedisSubscriber();  
  29.         new Thread(new Runnable() {  
  30.   
  31.             public void run() {  
  32.                 try {  
  33.                     //subJedis.subscribe(sub, CHANNEL_NAME);  
  34.                     subJedis.psubscribe(sub, "^chiwei.*");  
  35.                     log.debug("Subscribe ended");  
  36.                 } catch (Exception e) {  
  37.                     log.error("Subscribe failed", e);  
  38.                 }  
  39.             }  
  40.   
  41.         }).start();  
  42.   
  43.         Jedis pubJedis = jedisPool.getResource();  
  44.         new RedisPublisher(pubJedis, CHANNEL_NAME).start();  
  45.         sub.unsubscribe();  
  46.         jedisPool.returnResourceObject(subJedis);  
  47.         jedisPool.returnResourceObject(pubJedis);  
  48.         jedisPool.close();  
  49.     }  
  50.   
  51. }  

subJedis.psubscribe(sub, "^chiwei.*");按照正则匹配订阅的频道

由于订阅类会阻塞当前线程的执行,所以在main线程中另起一个线程来启动订阅,然后启动发布线程去发布内容。

2015-04-17 10:35:43,751 - com.chiwei.redis.RedisPubSubMain[18] -0    [main] DEBUG  - =========================
2015-04-17 10:35:43,843 - com.chiwei.redis.RedisSubscriber[29] -92   [Thread-3] DEBUG  - Pattern Subscribe,Pattern:^chiwei.*,ChannelNum:1
2015-04-17 10:35:43,848 - com.chiwei.redis.RedisPublisher[25] -97   [main] DEBUG  - Type your message (type quit to exit)
3
2015-04-17 10:35:53,132 - com.chiwei.redis.RedisPublisher[34] -9381 [main] DEBUG  - Match...
2015-04-17 10:35:53,138 - com.chiwei.redis.RedisPublisher[39] -9387 [main] DEBUG  - Publish to chiwei.momo
2015-04-17 10:35:53,140 - com.chiwei.redis.RedisPublisher[34] -9389 [main] DEBUG  - Match...
2015-04-17 10:35:53,146 - com.chiwei.redis.RedisPublisher[39] -9395 [main] DEBUG  - Publish to chiwei.nono
2015-04-17 10:35:53,153 - com.chiwei.redis.RedisPublisher[39] -9402 [main] DEBUG  - Publish to taotao






    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多