
7月12日一款叫做TDengine的时序数据库项目在GitHub上开源了,这个项目一经发布就稳稳占据了GitHub排行榜的C位,目前TdEngine已经累积了5000多个star,并且连续一周排在上升榜首位。而且你要知道TdEngine的开发语言并不是火热的Python或JAVA,而是C语言。C语言无巧可取,虽见功夫,但是代码比较难读,能引发如此的关注绝对堪称奇迹,在我印象中即使是Mysql也没有达到如此的热度。
相信很多人也和笔者一样,是通过《比hadoop快至少10 倍的物联网大数据平台,我把它开源》的刷屏文才了解到陶老师与TdEngine的,当看到这位50岁的IT老兵老兵,依旧奋斗在编程一线,为TDengine开发贡献3万行代码时候,我就立刻四处向朋友打听,并最终要了陶老师的微信,做为一名80后程序员,我近不急待的想和陶老师直接沟通,想从他身上找到保持编程水平的秘决。 
大神面对面:这才是10倍程序员该有的样子 2008年的时候笔者还是CSDN论坛WINDOWS MOBILE版的版主,从事手机导航软件的开发工作,而在彼时陶老师也创办了和信公司,并亲自开发了WindowsMobile版和信客户端,相同的开发平台经历也让我们迅速的拉进了彼此的距离。 在使用TdEngine的过程中我发现了两个小问题,一是数据库用户密码明文存放,二是数据文件权限设置不合理。让我十分震惊的是,这两个问题是我下午在和陶老师聊天时提出的,当晚发布版本就把问题全部解决了。后来沟通得知这些BUG都是陶老师自己动手修改的。我意识到TdEngine的效率应该来自于创始人对于代码的执着与热爱,而不是对员工996式的工作要求。 陶老师是真的爱编程,尤其对于代码运行效率有着近乎狂热的追求,我查阅了陶老师近年来的作品,其和信客户端只有18K大小,胎心算法的实现只用了600行代码,而TDengine这样一个数据库项目竟然只需要1.5M安装包就能搞定,在手机APP都动辙上百M的今天,TDengine体量甚至显得有些异类。如果没有深厚的功底和坚定的信念是绝对无法达到如此高度的。我想陶老师应该就是传说中10倍程序员的典范吧。 10倍程序员对于他周围亲友的影响也是非常巨大的,当我打开TdEngine的官网(https://www./cn/),其简洁明快的风格,一目了然的配图,实在让我无法把这一切和一位年近半百的老派IT士人联系到一起,当然后来我和陶老师聊到这件事的时候才知道,整个网站从设计、前端、后台、浏览器适配、数据分析到搜索引擎优化,都是由陶老师的儿子,一位刚刚高中毕业的00后操刀主持的,而且整个网站从无到有只用了三周时间,除了感叹一句后生可畏,由此也可以看出来和10倍程序员并肩作战的也都是10倍程序员,所以it团队的负责人在感叹自己没有18程序员相助时也要反思一下,自己是不是一位10程序员。 TdEngine为什么会火? 传统数据库厂商的问题在于傲慢、自大,他们认为数据是零件,数据库则是各类零件的加中心,很多工序都是为数据的修改准备的,无论修改是否发生加工车间为了保证一致性,都会对流水线上的数据加上各种各样的锁。这些操作浪费了很多时间,而且几乎没有任何轻量级的框架,可供用户选择省略掉这些冗余操作。而且传统厂商为了解决数据库的性能问题不是从底层架构逻辑下手,而是不休止的在应用与数据库之间加入各种像REDIS,NGIX等等代理或者缓存层,这种方式其实是加大了各层级间的性能开销。传统厂商认为自己非常了解数据,但却忘了用户比厂商更加了解自己的数据,天下可谓苦秦久已。 而TdEngine是认为数据是信息流,它要做的非常简单,只是数据的录像机而已,信息调阅只要找到对应的录像带即可,这样的设计思路从底层逻辑上决定了td会是一款性能极高的产品。它更加贴合物联网时代的数据模型,而且代码只有10万行的量级,非常适合从从头开始学习。 所以TdEngine精确的找到了数据库市场的细分战场。他可以在相同的硬件条件下达到其它产品10倍的速度,完美解决了很多物联网,量化交易等场景的痛点。 TdEngine代码导读
当笔者打TdEngine的代码时不由眼前一亮,其代码风格及规范性绝对堪称一流,于是我打开了久违的souce insight,,再一次开始了阅读C语言代码的美妙旅程,在这里强烈推荐各位读者也来读一下,绝对堪称享受。 这里将给我启示最大的一段代码其链接在 https://github.com/taosdata/TDengine/blob/master/src/util/src/tsched.c
向大家分享一下。鉴于本文肯定会分享给陶老师,所以估计会有作者亲答的环节:-),以下代码是一个典型的consumer-producer消息传递功能的实现,也就是有多个生产者(producer)生成并不断向队列中传递消息,也有多个消费者(consumer)不断从队列中取消息,而在java等高级语言中类似的功能已经被封装好了,这其实也让程序员无法了解线程间的同步和互斥机制。在正式进入到代码之前我想请大家思考这样的一个,互斥体( mutex)和信号量(semaphore)的使用是如何做到多线程安全的。 先来看结构体设计,具体我已经注释好了: typedef struct { char label[16];#消息内容 sem_t emptySem;#此信号量代表队列的可写状态 sem_t fullSem;#此信号量代表队列的可读状态 pthread_mutex_t queueMutex;#此互斥体为保证消息不会被误修改,保证线程程安全 int fullSlot;#队尾位置 int emptySlot;#队头位置 int queueSize;#队列长度 int numOfThreads;#同时操作的线程数量 pthread_t * qthread;#线程指针 SSchedMsg * queue;#队列指针 } SSchedQueue;
再来看初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下: void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label);
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { pError('init %s:queueMutex failed, reason:%s', pSched->label, strerror(errno)); goto _error; } #emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError('init %s:empty semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } #fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError('init %s:full semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; }
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError('%s: no enough memory for queue, reason:%s', pSched->label, strerror(errno)); goto _error; }
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;#实始化时队列为空,故队头和队尾的位置都是0 pSched->emptySlot = 0;#实始化时队列为空,故队头和队尾的位置都是0
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < pSched->numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError('%s: failed to create rpc thread, reason:%s', pSched->label, strerror(errno)); goto _error; } }
pTrace('%s scheduler is initialized, numOfThreads:%d', pSched->label, pSched->numOfThreads);
return (void *)pSched;
_error: taosCleanUpScheduler(pSched); return NULL; }
再来看读消息的taosProcessSchedQueue函数,这个主要逻辑是 1.使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理 2.在操作msg前,加入互斥体防止msg被误用。 3.读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体。 4.对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6。 具体代码及注释如下: void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; #注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError('wait %s fullSem failed, errno:%d, reason:%s', pSched->label, errno, strerror(errno)); if (errno == EINTR) { /* sem_wait is interrupted by interrupt, ignore and continue */ continue; } } #加入互斥体防止msg被误用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno));
msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); #读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; #读取完毕修改退出互斥体 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s\n', pSched->label, strerror(errno)); #读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6 if (sem_post(&pSched->emptySem) != 0) pError('post %s emptySem failed, reason:%s\n', pSched->label, strerror(errno));
if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); } }
最后来看写消息的taosScheduleTask函数,其基本逻辑是 1.写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。 2.加入互斥体防止msg被误操作,写入完成后退出互斥体。 3.写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中 sem_wait(&pSched->fullSem)不再阻塞就继续向下。 int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError('sched is not ready, msg:%p is dropped', pMsg); return 0; } #在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。 if (sem_wait(&pSched->emptySem) != 0) pError('wait %s emptySem failed, reason:%s', pSched->label, strerror(errno)); #加入互斥体防止msg被误操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno));
pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); #在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。 if (sem_post(&pSched->fullSem) != 0) pError('post %s fullSem failed, reason:%s', pSched->label, strerror(errno));
return 0; }
当然以上只是TdEngine优美代码的一小部分,而且笔者解读的功力也十分有限,这里再次强烈建议大家下载全部源码仔细学习,定能受益匪浅。 原文: https://blog.csdn.net/BEYONDMA/article/details/96578186 【End】
|