分享

memcacheq一个国内开发的message queue之性能研究

 zybingliu 2009-11-06
memcacheq一个国内开发的message queue之性能研究
2008-05-07 14:40
XMPP中的presence适合用Message Queue的方式来实现,所以一直对快速的message queue实现比较关心。刚好在 memcachedb 上看到了一个 memcacheq 的项目,从字面上看就是一个mq的实现。因为它也采用memcache的协议,所以将以前做的memcachedb的性能测试程序稍微修改看了下结果,环境和前面一样,测试数据不承担任何责任,仅供参考:)

1. 仅写入队列, 100万条
Thread: 2
Time elpased: 427s
Avg: 2,349 puts / sec
bdb data dir size: 945M

2. 仅队列读出, 100万条,使用上一步构建的队列数据。
Thread: 2
Time elpased: 569s
Avg: 1,763 gets / sec
bdb data dir size: 1.7G (这个地方有问题,可能还没完成?)

3. 同时出入队列,这个是符合应用的真实的情况,写入10万行后才开始启动读出队列程序。
Thread: 2
Time elpased: 569s
Avg: 1,049 puts / sec
Avg: 979 gets / sec
bdb data dir size: 1.6G

测试总结:
  • 测试3中入队去除刚开始单独放10万条记录的速度,实际上出队和入队的速度是一样的,即同时操作平均速度在每秒1,000条(100 byte message)。
  • 速度趋于相同可参考我以前的分析 memcachedb单线程访问bdb的阻塞问题
  • 测试2中数据库相关实现可能有问题,数据越跑越大。
  • 因为以前用的是Apache ActiveMQ ,所以下一步打算在同环境比较一下。参看后文:ActiveMQ性能研究及与memcacheq比较
2008/5/8:看了一下memcacheq的代码,补充一下几个新看法:
  • memcacheq 是基于 berkeley db queue 的 memcache 协议包装
  • memcacheq 本身没有mq相关算法实现,所以上面实际测试的是单线程下berkeley db的效率。
  • 空间变大的问题是由于berkeley db算法延后删除,不是问题。
  • 有朋友问测试代码,其实比较简单,如下。


import java.util.concurrent.atomic.AtomicInteger;

/**
* Test client for memcache protocol servers
* We use java memcached client, download from http://www./memcached/
* run: java MQTest [threads] [type], e.g. java MQTest 2 1
* @author Tim
*/
public class MQTest implements Runnable {
CacheClient cc;
public static AtomicInteger count = new AtomicInteger();
public static final String DATA100 =
"TIM'S DATA AND CODE WITHOUT WARRANTY OF ANY KIND.#################" +
"##################################";

// type: 0 - get, 1 - put
private static int testType = 1;
public static void main(String[] args) {
int thread = 1;

if (args.length > 1) {
thread = Integer.parseInt(args[0]);
testType = Integer.parseInt(args[1]);
}

System.out.println("Thread: " + thread);
System.out.println("Test Type: " + (testType == 0 ? "get queue" : "put queue"));

long time1 = System.currentTimeMillis();
for (int i = 0; i < thread; i++) {
MQTest test = new MQTest();
Thread t = new Thread(test);
t.start();
}

printStat(time1);
}

public MQTest() {
cc = new CacheClient();
cc.setServerPort("server:11211");
}

public void run() {
while (true) {
if (testType != 0)
set();
else
get();
}
}

public void get() {
cc.get("q");
count.incrementAndGet();
}

public void set() {
cc.set("q", DATA100);
count.incrementAndGet();
}

private static void printStat(long time1) {
while (true) {
long time2 = System.currentTimeMillis();
if (time2 == time1)
continue;
int cnt = count.get();

System.out.println("---------------------------");
System.out.println("Total: " + cnt);

System.out.print("Time elapsed: ");
System.out.println((time2 - time1) / 1000);

System.out.print("AVG: ");
System.out.println(cnt * 1000l / (time2 - time1));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多