分享

使用MQTT协议+Redis缓存实现APP登录顶号功能 | jwcqc个人笔记 | IT瘾

 WindySky 2020-01-15

大家在玩游戏或使用QQ等IM工具时,想必都见到过弹出被顶号或者是您的账号于xx时间在另一设备登录,您已被迫下线这样的提示,然后不得不点退出按钮退出整个应用,或者点击重新登录把另一设备再顶下来。最近我参与的一个项目,正好就有这样的需求,而且,由于我们项目中已经使用到了MQTT协议进行消息推送,实现远程控制,后台用Java实现,缓存使用了Redis,因此,正好可以利用现有的技术来实现这个功能。

实现的思路大概如下:首先,登录时不仅需要账号密码,还可以将设备关键信息记录下来,如设备型号(Android|iPhone)、登录时间、登录IP、设备唯一标识(UUID)等,这就需要前台登录功能与后台接口一起配合实现,并在后台把userId已经相关设备信息保存到Redis中,当在另外一台新设备上登录同一账号时,将userId对应的相关登录设备信息直接进行覆盖,此时如果旧设备进行重连时,因为该uuid已经不是当前服务端的uuid了,所以直接返回下线通知,为了进行友好提示,也可以将新登录设备的主要信息(设备型号、登录时间)进行返回。

下面简单介绍一下实现的方法。

软件安装

Linux下mqtt服务器Apollo的安装

下载

选择一个目录用来下载保存
下载地址: http://activemq./apollo/download.html
官网教程: http://activemq./apollo/documentation/getting-started.html
目前版本是 apache-apollo-1.7.1-unix-distro.tar .gz

创建broker

一个broker实例是一个文件夹,其中包含所有的配置文件及运行时的数据,不如日志和消息数
据。Apollo强烈建议不要把实例同安装文件放在一起。在linux操作系统下面,建议将实例建在
/var/lib/目录下面

首先解压:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
选择一个目录存放解压后的文件,我放在了/server/下,解压后的文件夹为 apache-apollo-1.7.1

开始创建broker实例:

             
1
2
             
cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker

下图是Apache官方给的一些建议截图:

启动broker实例

启动broker实例可以有两种方法,如下图中所示:

可以执行

             
1
             
/var/lib/mybroker/bin/apollo-broker run

或者

             
1
2
             
sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start

使其作为一个service进行启动,以后系统重启后只需运行/etc/init.d/apollo-broker-service start

访问Apollo的监控页面: http://localhost:61680/默认用户名、密码为为 admin/password

Linux下Redis的安装与配置

Redis的安装非常简单,已经有现成的Makefile文件,解压后在src目录下使用make命令完成编译即可,redis-benchmark、redis-cli、redis-server、redis-stat 这四个文件,加上一个 redis.conf 就构成了整个redis的最终可用包。它们的作用如下:

redis-server:Redis服务器的daemon启动程序
redis-cli:Redis命令行操作工具。当然,你也可以用telnet根据其纯文本协议来操作
redis-benchmark:Redis性能测试工具,测试Redis在你的系统及你的配置下的读写性能
redis-stat:Redis状态检测工具,可以检测Redis当前状态参数及延迟状况

下载安装:

            
1
2
3
4
5
            
wget http://download./redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
make install

启动

编译后生成的可执行文件:
redis-server 是Redis的服务器,启动Redis即运行redis-server
redis-cli 是Redis自带的Redis命令行客户端,学习Redis的重要工具

./redis-server & 不指定配置直接运行,这时采用默认配置,无密码
./redis-server –port 6379 仅指定端口
./redis-server ../redis.conf 指定配置文件

最好还是使用最后一种方式进行启动

如果只是在本机连接,那麽使用默认配置文件不会有什么问题,但是,如果是连接远程服务器端的Redis,则需要对配置文件进行一些修改:

             
1
2
3
             
requirepass foobared
#bind 127.0.0.1 ##注释掉
protected-mode no ##从yes改成no

至于如何将Redis设置后台服务,开机自启等,这里就不介绍了,可以去搜索一下。

功能实现

后台接口

Redis客户端使用的是Jedis,如下代码是一个对Jedis简单的封装

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import java.util.ResourceBundle;
/**
* Jedis Cache 工具类
*/
public class JedisUtils {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
private static JedisPool jedisPool;
/**
* 读取相关的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");
JedisPoolConfig config = new JedisPoolConfig();
//设置最大连接数
config.setMaxTotal(maxActive);
//设置最大空闲数
config.setMaxIdle(maxIdle);
//设置超时时间
config.setMaxWaitMillis(maxWait);
//初始化连接池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}
/**
* 获取缓存
* @param key 键
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}
/**
* 设置缓存
* @param key 键
* @param value 值
* @param cacheSeconds 超时时间,0为不超时
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 删除缓存
* @param key 键
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 缓存是否存在
* @param key 键
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 获取资源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}
/**
* 归还资源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}
/**
* 释放资源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}

然后在登录接口中,当判断完登录的用户名密码正确后,可以参考如下代码的思路去实现,首先判断Redis中是否已保存有这个userId对用的值,有的话说明当前已经有登录,需要被替换到,同时使用MQTT发送消息给客户端使其退出,Redis中不存在则只需保存userId和uuidStr即可

            
1
2
3
4
5
6
7
8
9
10
11
12
            
String uuidStr = ""; //这个值从APP端传过来
// 先判断Redis中是否已经有,有的话需要替换掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}
JedisUtils.set(userId, uuidStr, 0);

至于MQTT协议的实现,这里使用的是Paho,如果后台项目是使用Maven构建的话,在pom.xml中加入如下几行即可:

            
1
2
3
4
5
            
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>

然后对其进行了一个简单的封装

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
            
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
private MyMqttClient() {}
private static MqttClient mqttClientInstance = null;
private static MqttConnectOptions options;
//静态工厂方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName("admin");
//设置连接的密码
options.setPassword("password".toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
mqttClientInstance.connect(options);
}
return mqttClientInstance;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static MqttConnectOptions getOptions(){
return options;
}
}

app端

客户端的做法思路也很简单,由于使用了MQTT,因此客户端和服务器端其实已经保持了一个长连接,可以为客户端写一个MQTTService,随时监听服务器推送过来的消息进行处理

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            
//为MTQQ client设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
}
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
if(message.toString().equals("Log out")) {
handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被顶号了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出当前账号,在这里简单粗暴的结束了应用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});

总结

上述代码可能在严谨性和可靠性上还会存在一些问题,还需要经过不断的完善,但思路是很明确的。在这里尤其要安利一下MTQQ,现在越来越多的产品都是基于这个协议进行开发,进行消息推送等。它开销很小,支持各种流行编程语言,能够适应不稳定的网络传输需求,在未来几年,相信MQTT的应用会越来越广。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多