event4j 是一个实现了事件驱动 和异步化 的轻量级框架,可帮助我们实现以事件驱动为中心的应用,采用分派事件和监听事件的方式,使处理事务的上下游各个组件之间不直接发生交互,降低耦合,扩展方便。另外,event4j让事务的每一个处理环节都是异步化且并行执行,使应用的吞吐量大大提升。
实现一个功能时,传统 的做法是串行执行 一系列的操作。例如:一个用户登陆的功能,在校验账号和密码成功后,还会进行以下操作:
- 写业务统计日志。
- 通知相关的组件,某个账号登陆了,并且将账号、登陆时间、登陆的环境信息都发送过去。
- 将一条记录到账号的登陆历史中。
我们的做法通常是这样的:在成功校验账号和密码后,再写一堆代码来实现上述的三个操作。所有的操作都是串行的,如下图所示:
这样做没错,但随着系统的演进,为了统计需要或相关联系统的业务需要做的一些额外操作,不断地在功能点中增加许多与用户登陆无关但有必要的操作。这样,在账号登陆的功能中掺入了许多实现额外功能的代码,使之变得臃肿。而且,在系统并发量高的情况下会带来一个问题:账号登陆变慢。 面对变量的问题,我们就会开始实现异步化(推荐用threadpool4j),把这些额外的操作变成异步任务,减少处理时间,提升账号登陆的响应速度。 把一些操作异步化后,代码的主线逻辑更加清晰,臃肿的情况有很大改善。但还是会有不少处理异步任务相关的代码在其中,那有没有可能更进一步,在用户的登陆操作中不出现这些异步任务的代码呢?答案是:用事件驱动。 用event4j可以非常优雅地解决这个问题,不仅使处理账号登陆的代码变得非常简洁,而且将来扩展方便。首先,将账号及其相关信息生成一个事件并分派出去,然后写三个处理器监听这个事件即可,如果后续还有类似的在账号登陆成功后要做一些额外的操作,再写一个处理器监听这个事件就行。这样,账号登陆的功能中不再包含实现额外操作的代码,并且通过event4j将登陆操作与其他的额外操作解耦。如下图所示:
一、编译event4j1、获取threadpool4j源码。git clone https://github.com/aofeng/event4j
2、编译源码生成jar。进入项目根目录,执行ant脚本: ant
会生成一个dist目录,下面有两个文件。如:
event4j-1.0.0-src.jar 源码jar event4j-1.0.0.jar 用于发布的二进制jar
二、event4j入门指南为了更好地理解event4j,通过实现一个简单且可容易实现的实例来展示event4j的用法: 用例有一个日志文件LoginInfoRecords.txt,里面记录着用户的登陆信息,每行共4个字段,用符号“`”隔开,分别是:登陆时间、来源IP、账号、登陆结果。从日志文件中统计总登陆次数和登陆成功率。 实现思路1、实现一个文件读取器,按行读取。每读取一行,产生一个“完成读取一行(ReadLineComplete )”的事件。 2、实现一个格式转换器,监听事件“完成读取一行”。完成格式转换后,产生一个“完成格式转换(LoginInfoCodecComplete )”的事件。 3、实现两个数据处理器:登陆次数统计器、登陆成功率统计器,它们监听事件“完成格式转换”。
编写程序 1、依赖关系及其配置。
event4j 依赖threadpool4j ,需要的jar列表如下:
- common4j-0.1.0.jar
- commons-lang-2.6.jar
- log4j-1.2.16.jar
- threadpool4j-1.0.0.jar
2、配置threadpool4j。 在应用的CLASSPATH的任意路径(如:应用的classes目录)下新文本文件建threadpool4j.xml,其内容如下: <?xml version="1.0" encoding="UTF-8"?>
<threadpool4j>
<pool name="default">
<corePoolSize>30</corePoolSize>
<maxPoolSize>150</maxPoolSize>
<!-- 线程空闲存话的时间。单位:秒 -->
<keepAliveTime>5</keepAliveTime>
<workQueueSize>100000</workQueueSize>
</pool>
</threadpool4j>
3、配置event4j。 在应用的CLASSPATH的任意路径(如:应用的classes目录)下新建文本文件event4j.xml的配置文件,其内容如下: <?xml version="1.0" encoding="UTF-8"?>
<events>
<!--
<event>中的type属性表示事件类型,可以有1至多个listener。
<listener>没有属性,只有value,value值是事件监听器的完整类名。
-->
<!-- 事件:完成读取一行 -->
<event type="ReadLineComplete">
<listener>cn.aofeng.event4j.example.LoginInfoCodec</listener>
</event>
<!-- 事件:完成格式转换 -->
<event type="LoginInfoCodecComplete">
<listener>cn.aofeng.event4j.example.LoginCountProcessor</listener>
<listener>cn.aofeng.event4j.example.LoginSuccessRateProcessor</listener>
</event>
</events>
4、实现文件读取器。 // 1. 初始化event4j
EventDispatch.getInstance().init();
// 2. 事件分派
BufferedReader reader = null;
String line = "";
try {
InputStream ins = Event4JExample.class.getResourceAsStream("/cn/aofeng/event4j/example/LoginRecords.txt");
reader = new BufferedReader(new InputStreamReader(ins));
do {
line = reader.readLine();
if (StringUtils.isBlank(line)) {
continue;
}
Event<String> event = new Event<String>("ReadLineComplete", line);
EventDispatch.getInstance().dispatch(event);
} while (null != line);
} catch (Exception e) {
_logger.error("read file [CLASSPATH/cn/ofeng/event4j/example/LoginRecords.txt] occurs error", e);
} finally {
try {
reader.close();
} catch (IOException e) {
// nothing
}
} // end of try catch finally block
// 3. 关闭event4j,释放资源
EventDispatch.getInstance().destroy();
其源码可查看文件Event4JExample.java 5、实现格式转换器。 /**
* 数据转换器:将一行字符串转换成{@link LoginInfo}对象。
*
* @author <a href="mailto:aofengblog@163.com">聂勇</a>
*/
public class LoginInfoCodec extends AbstractEventListener<String> {
@Override
public void execute(Event<String> event) {
// 1. 校验数据的有效性
String line = event.getData();
if (StringUtils.isBlank(line)) {
return;
}
// 2. 将行数据转换成Java对象
String[] datas = line.split("`");
LoginInfo loginInfo = new LoginInfo();
loginInfo.setLoginTime(Long.parseLong(datas[0]));
loginInfo.setIp(datas[1]);
loginInfo.setUserName(datas[2]);
loginInfo.setResultCode(Integer.parseInt(datas[3]));
// 3. 生成事件并分派
Event<LoginInfo> loginInfoEvent = new Event<LoginInfo>("LoginInfoCodecComplete", loginInfo);
EventDispatch.getInstance().dispatch(loginInfoEvent);
}
}
其源码可查看文件LoginInfoCodec.java 6、实现登陆次数统计器 /**
* 登陆次数统计器。
*
* @author <a href="mailto:aofengblog@163.com">聂勇</a>
*/
public class LoginCountProcessor extends AbstractEventListener<LoginInfo> {
private long _count;
@Override
public void execute(Event<LoginInfo> event) {
if (null == null) {
return;
}
_count ++;
// 后续如何输出数据可自行处理 。。。
}
}
其源码可查看文件LoginCountProcessor.java 7、实现登陆成功率统计器 /**
* 登陆成功率统计器。
*
* @author <a href="mailto:aofengblog@163.com">聂勇</a>
*/
public class LoginSuccessRateProcessor extends AbstractEventListener<LoginInfo> {
private long _total;
private long _success;
@Override
public void execute(Event<LoginInfo> event) {
if (null == null) {
return;
}
_total ++;
if (1 == event.getData().getResultCode()) {
_success ++;
}
// 后续如何输出数据可自行处理 。。。
}
}
其源码可查看文件LoginSuccessRateProcessor.java
<正文结束>
|