c#之Redis队列在邮件提醒中的应用
场景
有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。
准备
c#之Redis实践list,hashtable
c#之Redis队列
方案
1、生产者线程一获取所有开启邮件提醒的用户。
2、根据配置来决定使用多少个队列,以及每个队列的容量。
3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。
4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。
5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。
测试
队列
测试代码
///
///消息队列管理
///
publicclassMyRedisQueueBus:IDisposable
{
///
///线程个数
///
privateint_threadCount;
///
///每个线程中itcode的容量
///
privateint_threadCapacity;
///
///线程
///
privateThread[]_threads;
///
///生产者线程
///
privateThread_producerThread;
///
///挂起时间
///
privateconstintWAITSECONDE=2000;
///
///队列名称前缀
///
privatestring_queuePrefix;
///
///构造函数
///
///线程个数
///每个线程处理的队列容量
///每个线程处理的队列容量
publicMyRedisQueueBus(intthreadCount,intthreadCapacity,stringqueuePrefix)
{
this._threadCapacity=threadCapacity;
this._threadCount=threadCount;
this._queuePrefix=queuePrefix+"_{0}";
}
///
///开启生产者
///
publicvoidStartProducer(www.visa158.com)
{
_producerThread=newThread((www.hunanwang.net)=>
{
IRedisClientFactoryfactory=RedisClientFactory.Instance;
EmailAlertsDataemailAlertsData=newEmailAlertsData();
//白名单
string[]userIdsWhiteArray=TaskGloableParameter.WhiteList.Split(newchar[]{'','','',''},
StringSplitOptions.RemoveEmptyEntries);
//入队
using(IRedisClientclient=factory.CreateRedisClient(WebConfig.RedisServer,WebConfig.RedisPort))
{
client.Password=WebConfig.RedisPwd;
client.Db=WebConfig.RedisServerDb;
while(true)
{
//获取所有开启邮件提醒的用户
ListlstEmails=emailAlertsData.GetAllStartAlerts(SyncState.ALL,userIdsWhiteArray);
foreach(variteminlstEmails)
{
intqueueIndex=-1;
stringqueueName=string.Format(this._queuePrefix,queueIndex);
for(inti=0;i<_threadCount;i++)
{
queueName=string.Format(this._queuePrefix,i);
//如果当前队列没有填满,则直接跳出,使用该队列进行入队
if(client.GetListCount(queueName)<_threadCapacity)
{
queueIndex=i;
break;
}
}
//如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始
if(queueIndex==-1)
{
Thread.SpinWait(WAITSECONDE);
//重新获取队列
for(inti=0;i<_threadCount;i++)
{
queueName=string.Format(this._queuePrefix,i);
//如果当前队列没有填满,则直接跳出,使用该队列进行入队
if(client.GetListCount(queueName)<_threadCapacity)
{
queueIndex=i;
break;
}
}
}
else
{
//入队
client.EnqueueItemOnList(queueName,JsonConvert.SerializeObject(newMyQueueItem
{
UserId=item.itcode,
SyncState=item.Email_SyncState
}));
}
}
}
}
});
_producerThread.Start();
}
///
///开启消费者
///
publicvoidStartCustomer()
{
_threads=newThread[_threadCount];
for(inti=0;i<_threads.Length;i++)
{
_threads[i]=newThread(CustomerRun);
_threads[i].Start(i);
}
}
privatevoidCustomerRun(objectobj)
{
intthreadIndex=Convert.ToInt32(obj);
stringqueueName=string.Format(this._queuePrefix,threadIndex);
IRedisClientFactoryfactory=RedisClientFactory.Instance;
using(IRedisClientclient=factory.CreateRedisClient(WebConfig.RedisServer,WebConfig.RedisPort))
{
while(true)
{
client.Password=WebConfig.RedisPwd;
client.Db=WebConfig.RedisServerDb;
if(client.GetListCount(queueName)>0)
{
stringresultJson=client.DequeueItemFromList(queueName);
//如果获取的结果为空,则挂起2s
if(string.IsNullOrEmpty(resultJson))
{
Thread.SpinWait(WAITSECONDE);
}
else
{
try
{
//耗时业务处理
MyQueueItemitem=JsonConvert.DeserializeObject(resultJson);
Console.WriteLine("Threadid:{0},User:{1}",Thread.CurrentThread.ManagedThreadId.ToString(),item.UserId);
}
catch(Exceptionex)
{
//如果出错,重新入队
client.EnqueueItemOnList(queueName,resultJson);
}
}
}
else
{
//当前队列为空,挂起2s
Thread.SpinWait(WAITSECONDE);
}
}
}
}
publicvoidDispose()
{
//释放资源时,销毁线程
if(this._threads!=null)
{
for(inti=0;i {
this._threads[i].Abort();
}
}
GC.Collect();
}
}
Main方法调用
staticvoidMain(string[]args)
{
MyRedisQueueBusbus=newMyRedisQueueBus(10,10,"mail_reminder_queue");
bus.StartProducer();
Thread.SpinWait(2000);
bus.StartCustomer();
Console.Read();
}
总结
通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。
上面的代码只是测试用的代码,后来发现将创建IRedisClient写在循环内,很容易出问题,频繁创建client,也以为这频繁打开关闭,如果释放不及时,那么会产生很多的redis连接,造成redis服务器负担。如果放在循环外边,这个client负责一直从队列中取数据就行,直到该线程停止。
|
|