配色: 字号:
c#之Redis队列在邮件提醒中的应用
2016-09-23 | 阅:  转:  |  分享 
  
c#之Redis队列在邮件提醒中的应用

场景

有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。



准备

c#之Redis实践list,hashtable



c#之Redis队列



方案

1、生产者线程一获取所有开启邮件提醒的用户。



2、根据配置来决定使用多少个队列,以及每个队列的容量。



3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。



4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。



5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。



测试







队列







测试代码





///

///消息队列管理

///


publicclassMyRedisQueueBus:IDisposable

{

///

///线程个数

///


privateint_threadCount;

///

///每个线程中itcode的容量

///


privateint_threadCapacity;

///

///线程

///


privateThread[]_threads;

///

///生产者线程

///


privateThread_producerThread;

///

///挂起时间

///


privateconstintWAITSECONDE=2000;

///

///队列名称前缀

///


privatestring_queuePrefix;

///

///构造函数

///


///线程个数

///每个线程处理的队列容量

///每个线程处理的队列容量

publicMyRedisQueueBus(intthreadCount,intthreadCapacity,stringqueuePrefix)

{

this._threadCapacity=threadCapacity;

this._threadCount=threadCount;

this._queuePrefix=queuePrefix+"_{0}";

}

///

///开启生产者

///


publicvoidStartProducer(www.visa158.com)

{

_producerThread=newThread((www.hunanwang.net)=>

{

IRedisClientFactoryfactory=RedisClientFactory.Instance;

EmailAlertsDataemailAlertsData=newEmailAlertsData();

//白名单

string[]userIdsWhiteArray=TaskGloableParameter.WhiteList.Split(newchar[]{'','','',''},

StringSplitOptions.RemoveEmptyEntries);

//入队

using(IRedisClientclient=factory.CreateRedisClient(WebConfig.RedisServer,WebConfig.RedisPort))

{

client.Password=WebConfig.RedisPwd;

client.Db=WebConfig.RedisServerDb;

while(true)

{

//获取所有开启邮件提醒的用户

ListlstEmails=emailAlertsData.GetAllStartAlerts(SyncState.ALL,userIdsWhiteArray);



foreach(variteminlstEmails)

{

intqueueIndex=-1;

stringqueueName=string.Format(this._queuePrefix,queueIndex);

for(inti=0;i<_threadCount;i++)

{

queueName=string.Format(this._queuePrefix,i);

//如果当前队列没有填满,则直接跳出,使用该队列进行入队

if(client.GetListCount(queueName)<_threadCapacity)

{

queueIndex=i;

break;

}

}

//如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始

if(queueIndex==-1)

{

Thread.SpinWait(WAITSECONDE);

//重新获取队列

for(inti=0;i<_threadCount;i++)

{

queueName=string.Format(this._queuePrefix,i);

//如果当前队列没有填满,则直接跳出,使用该队列进行入队

if(client.GetListCount(queueName)<_threadCapacity)

{

queueIndex=i;

break;

}

}

}

else

{

//入队

client.EnqueueItemOnList(queueName,JsonConvert.SerializeObject(newMyQueueItem

{

UserId=item.itcode,

SyncState=item.Email_SyncState

}));

}

}



}

}



});

_producerThread.Start();

}



///

///开启消费者

///


publicvoidStartCustomer()

{

_threads=newThread[_threadCount];

for(inti=0;i<_threads.Length;i++)

{

_threads[i]=newThread(CustomerRun);

_threads[i].Start(i);

}

}

privatevoidCustomerRun(objectobj)

{

intthreadIndex=Convert.ToInt32(obj);

stringqueueName=string.Format(this._queuePrefix,threadIndex);



IRedisClientFactoryfactory=RedisClientFactory.Instance;

using(IRedisClientclient=factory.CreateRedisClient(WebConfig.RedisServer,WebConfig.RedisPort))

{

while(true)

{

client.Password=WebConfig.RedisPwd;

client.Db=WebConfig.RedisServerDb;

if(client.GetListCount(queueName)>0)

{

stringresultJson=client.DequeueItemFromList(queueName);

//如果获取的结果为空,则挂起2s

if(string.IsNullOrEmpty(resultJson))

{

Thread.SpinWait(WAITSECONDE);

}

else

{

try

{

//耗时业务处理

MyQueueItemitem=JsonConvert.DeserializeObject(resultJson);

Console.WriteLine("Threadid:{0},User:{1}",Thread.CurrentThread.ManagedThreadId.ToString(),item.UserId);

}

catch(Exceptionex)

{

//如果出错,重新入队

client.EnqueueItemOnList(queueName,resultJson);



}



}

}

else

{

//当前队列为空,挂起2s

Thread.SpinWait(WAITSECONDE);

}

}

}



}

publicvoidDispose()

{

//释放资源时,销毁线程

if(this._threads!=null)

{

for(inti=0;i
{

this._threads[i].Abort();

}

}

GC.Collect();

}

}



Main方法调用





staticvoidMain(string[]args)

{

MyRedisQueueBusbus=newMyRedisQueueBus(10,10,"mail_reminder_queue");

bus.StartProducer();

Thread.SpinWait(2000);

bus.StartCustomer();

Console.Read();

}



总结

通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。



上面的代码只是测试用的代码,后来发现将创建IRedisClient写在循环内,很容易出问题,频繁创建client,也以为这频繁打开关闭,如果释放不及时,那么会产生很多的redis连接,造成redis服务器负担。如果放在循环外边,这个client负责一直从队列中取数据就行,直到该线程停止。





















献花(0)
+1
(本文系白狐一梦首藏)