分享

事件驱动的异步化框架:event4j

 WindySky 2017-09-03

event4j是一个实现了事件驱动异步化的轻量级框架,可帮助我们实现以事件驱动为中心的应用,采用分派事件和监听事件的方式,使处理事务的上下游各个组件之间不直接发生交互,降低耦合,扩展方便。另外,event4j让事务的每一个处理环节都是异步化且并行执行,使应用的吞吐量大大提升。

实现一个功能时,传统的做法是串行执行一系列的操作。例如:一个用户登陆的功能,在校验账号和密码成功后,还会进行以下操作:

  • 写业务统计日志。
  • 通知相关的组件,某个账号登陆了,并且将账号、登陆时间、登陆的环境信息都发送过去。
  • 将一条记录到账号的登陆历史中。

我们的做法通常是这样的:在成功校验账号和密码后,再写一堆代码来实现上述的三个操作。所有的操作都是串行的,如下图所示:

事件驱动的异步化框架:event4j - 傲风 - 0与1构筑世界,程序员创造时代
 

这样做没错,但随着系统的演进,为了统计需要或相关联系统的业务需要做的一些额外操作,不断地在功能点中增加许多与用户登陆无关但有必要的操作。这样,在账号登陆的功能中掺入了许多实现额外功能的代码,使之变得臃肿。而且,在系统并发量高的情况下会带来一个问题:账号登陆变慢

面对变量的问题,我们就会开始实现异步化(推荐用threadpool4j),把这些额外的操作变成异步任务,减少处理时间,提升账号登陆的响应速度。
把一些操作异步化后,代码的主线逻辑更加清晰,臃肿的情况有很大改善。但还是会有不少处理异步任务相关的代码在其中,那有没有可能更进一步,在用户的登陆操作中不出现这些异步任务的代码呢?答案是:用事件驱动

用event4j可以非常优雅地解决这个问题,不仅使处理账号登陆的代码变得非常简洁,而且将来扩展方便。首先,将账号及其相关信息生成一个事件并分派出去,然后写三个处理器监听这个事件即可,如果后续还有类似的在账号登陆成功后要做一些额外的操作,再写一个处理器监听这个事件就行。这样,账号登陆的功能中不再包含实现额外操作的代码,并且通过event4j将登陆操作与其他的额外操作解耦。如下图所示:

事件驱动的异步化框架:event4j - 傲风 - 0与1构筑世界,程序员创造时代

一、编译event4j

1、获取threadpool4j源码。

git clone https://github.com/aofeng/event4j

2、编译源码生成jar。

进入项目根目录,执行ant脚本:

ant

会生成一个dist目录,下面有两个文件。如:

event4j-1.0.0-src.jar 源码jar
event4j-1.0.0.jar 用于发布的二进制jar

二、event4j入门指南

为了更好地理解event4j,通过实现一个简单且可容易实现的实例来展示event4j的用法:

用例

有一个日志文件LoginInfoRecords.txt,里面记录着用户的登陆信息,每行共4个字段,用符号“`”隔开,分别是:登陆时间、来源IP、账号、登陆结果。从日志文件中统计总登陆次数和登陆成功率。

实现思路

1、实现一个文件读取器,按行读取。每读取一行,产生一个“完成读取一行(ReadLineComplete)”的事件。
2、实现一个格式转换器,监听事件“完成读取一行”。完成格式转换后,产生一个“完成格式转换(LoginInfoCodecComplete)”的事件。
3、实现两个数据处理器:登陆次数统计器、登陆成功率统计器,它们监听事件“完成格式转换”。

事件驱动的异步化框架:event4j - 傲风 - 0与1构筑世界,程序员创造时代
 

编写程序

1、依赖关系及其配置。
event4j依赖threadpool4j,需要的jar列表如下:

  • common4j-0.1.0.jar
  • commons-lang-2.6.jar
  • log4j-1.2.16.jar
  • threadpool4j-1.0.0.jar

2、配置threadpool4j。
在应用的CLASSPATH的任意路径(如:应用的classes目录)下新文本文件建threadpool4j.xml,其内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<threadpool4j>
    <pool name="default">
        <corePoolSize>30</corePoolSize>
        <maxPoolSize>150</maxPoolSize>
        <!-- 线程空闲存话的时间。单位:秒 -->
        <keepAliveTime>5</keepAliveTime>
        <workQueueSize>100000</workQueueSize>
    </pool>
</threadpool4j>

3、配置event4j。
在应用的CLASSPATH的任意路径(如:应用的classes目录)下新建文本文件event4j.xml的配置文件,其内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<events>
    <!-- 
    <event>中的type属性表示事件类型,可以有1至多个listener。
    <listener>没有属性,只有value,value值是事件监听器的完整类名。
     -->

    <!-- 事件:完成读取一行 -->
    <event type="ReadLineComplete">
        <listener>cn.aofeng.event4j.example.LoginInfoCodec</listener>
    </event>

    <!-- 事件:完成格式转换 -->
    <event type="LoginInfoCodecComplete">
        <listener>cn.aofeng.event4j.example.LoginCountProcessor</listener>
        <listener>cn.aofeng.event4j.example.LoginSuccessRateProcessor</listener>
    </event>

</events>

4、实现文件读取器。

// 1. 初始化event4j
EventDispatch.getInstance().init();

// 2. 事件分派
BufferedReader reader = null;
String line = "";
try {
    InputStream ins = Event4JExample.class.getResourceAsStream("/cn/aofeng/event4j/example/LoginRecords.txt");
    reader = new BufferedReader(new InputStreamReader(ins));
    do {
        line = reader.readLine();
        if (StringUtils.isBlank(line)) {
            continue;
        }
        Event<String> event = new Event<String>("ReadLineComplete", line);
        EventDispatch.getInstance().dispatch(event);
    } while (null != line);
} catch (Exception e) {
    _logger.error("read file [CLASSPATH/cn/ofeng/event4j/example/LoginRecords.txt] occurs error", e);
} finally {
    try {
        reader.close();
    } catch (IOException e) {
        // nothing
    }
} // end of try catch finally block

// 3. 关闭event4j,释放资源
EventDispatch.getInstance().destroy();

其源码可查看文件Event4JExample.java

5、实现格式转换器。

/**
 * 数据转换器:将一行字符串转换成{@link LoginInfo}对象。
 * 
 * @author <a href="mailto:aofengblog@163.com">聂勇</a>
 */
public class LoginInfoCodec extends AbstractEventListener<String> {

    @Override
    public void execute(Event<String> event) {
        // 1. 校验数据的有效性
        String line = event.getData();
        if (StringUtils.isBlank(line)) {
            return;
        }

        // 2. 将行数据转换成Java对象
        String[] datas = line.split("`");
        LoginInfo loginInfo = new LoginInfo();
        loginInfo.setLoginTime(Long.parseLong(datas[0]));
        loginInfo.setIp(datas[1]);
        loginInfo.setUserName(datas[2]);
        loginInfo.setResultCode(Integer.parseInt(datas[3]));

        //  3. 生成事件并分派
        Event<LoginInfo> loginInfoEvent = new Event<LoginInfo>("LoginInfoCodecComplete", loginInfo);
        EventDispatch.getInstance().dispatch(loginInfoEvent);
    }

}

其源码可查看文件LoginInfoCodec.java

6、实现登陆次数统计器

/**
 * 登陆次数统计器。
 * 
 * @author <a href="mailto:aofengblog@163.com">聂勇</a>
 */
public class LoginCountProcessor extends  AbstractEventListener<LoginInfo> {

    private long _count;

    @Override
    public void execute(Event<LoginInfo> event) {
        if (null == null) {
            return;
        }

        _count ++;

        // 后续如何输出数据可自行处理 。。。
    }

}

其源码可查看文件LoginCountProcessor.java

7、实现登陆成功率统计器

/**
 * 登陆成功率统计器。
 * 
 * @author <a href="mailto:aofengblog@163.com">聂勇</a>
 */
public class LoginSuccessRateProcessor extends  AbstractEventListener<LoginInfo> {

    private long _total;

    private long _success;

    @Override
    public void execute(Event<LoginInfo> event) {
        if (null == null) {
            return;
        }

        _total ++;
        if (1 == event.getData().getResultCode()) {
            _success ++;
        }

        // 后续如何输出数据可自行处理 。。。
    }

}

其源码可查看文件LoginSuccessRateProcessor.java


<正文结束>

文章声明


作者:傲风(aofengblog@163.com)       编写时间:2014年11月14日

网址:http://aofengblog.blog.163.com

作者保留所有权利,转载请保留文章全部内容或者说明原作者和转载地址!

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多