简单介绍PHP非阻塞模式
让PHP不再阻塞当PHP作为后端处理需要完成一些长时间处理,为了快速响应页面请求,不作结果返回判断的情况下,可以有如下措施:
一、若你使用的是FastCGI模式,使用fastcgi_finish_request()能马上结束会话,但PHP线程继续在跑。
?
1
2
3
4
5
6
7
8 echo"programstart.";
file_put_contents(''log.txt'',''start-time:''.date(''Y-m-dH:i:s''),FILE_APPEND);
fastcgi_finish_request();
sleep(1);
echo''debug...'';
file_put_contents(''log.txt'',''start-proceed:''.date(''Y-m-dH:i:s''),FILE_APPEND);
sleep(10);
file_put_contents(''log.txt'',''end-time:''.date(''Y-m-dH:i:s''),FILE_APPEND); 这个例子输出结果可看到输出programstart.后会话就返回了,所以debug那个输出浏览器是接收不到的,而log.txt文件能完整接收到三个完成时间。
二、使用fsockopen、cUrl的非阻塞模式请求另外的网址
?
1
2
3
4
5
6
7
8 $fp=fsockopen("www.example.com",80,$errno,$errstr,30);
if(!$fp)die(''errorfsockopen'');
stream_set_blocking($fp,0);
$http="GET/save.php/HTTP/1.1\r\n";?
$http.="Host:www.example.com\r\n";?
$http.="Connection:Close\r\n\r\n";
fwrite($fp,$http);
fclose($fp); 利用cURL中的curl_multi_函数发送异步请求
?
1
2
3
4
5
6 $cmh=curl_multi_init();
$ch1=curl_init();
curl_setopt($ch1,CURLOPT_URL,"http://localhost:6666/child.php");
curl_multi_add_handle($cmh,$ch1);
curl_multi_exec($cmh,$active);
echo"End\n"; 三、使用Gearman、Swoole扩展
Gearman是一个具有php扩展的分布式异步处理框架,能处理大批量异步任务;Swoole最近很火,有很多异步方法,使用简单。(尘缘注:号称重新定义PHP,把NodeJS喷得体无完肤。Swoole工具虽好,却感觉是扩展本身跟NodeJS没可比性)
四、使用redis等缓存、队列,将数据写入缓存,使用后台计划任务实现数据异步处理。
这个方法在常见的大流量架构中应该很常见吧
五、极端的情况下,可以调用系统命令,可以将数据传给后台任务执行,个人感觉不是很高效。
?
1
2 $cmd=''nohupphp./processd.php$someVar>/dev/null&'';
`$cmd` 六、外国佬的大招,没看懂,php原生支持
http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html
七、安装pcntl扩展,使用pcntl_fork生成子进程异步执行任务,个人觉得是最方便的,但也容易出现zombieprocess。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14 if(($pid=pcntl_fork())==0){
child_func();//子进程函数,主进程运行
}else{
father_func();//主进程函数
}
echo"Process".getmypid()."gettotheend.\n";
functionfather_func(){
echo"Fatherpidis".getmypid()."\n";
}
functionchild_func(){
sleep(6);
echo"Childprocessexitpidis".getmypid()."\n";
exit(0);
} 原文:Cooperativemultitaskingusingcoroutines(inPHP!)22.December2012
OneofthelargenewfeaturesinPHP5.5willbesupportforgeneratorsandcoroutines.Generatorsarealreadysufficientlycoveredbythedocumentationandvariousotherblogposts(likethisoneorthisone.Coroutinesontheotherhandhavereceivedrelativelylittleattention.Thereasonisthatcoroutinesarebothalotmorepowerfulandalothardertounderstandandexplain.
InthisarticleI’dliketoguideyouthroughanimplementationofataskschedulerusingcoroutines,soyoucangetafeelingforthestuffthattheyallowyoutodo.I’llstartoffwithafewintroductorysections.Ifyoufeellikeyoualreadygotagoodgraspofthebasicsbehindgeneratorsandcoroutines,thenyoucanjumpstraighttothe“Cooperativemultitasking”section.
Generators
Thebasicideabehindgeneratorsisthatafunctiondoesn’treturnasinglevalue,butreturnsasequenceofvaluesinstead,whereeveryvalueisemittedwww.baiyuewang.netonebyone.Orinotherwords,generatorsallowyoutoimplementiteratorsmoreeasily.Averysimpleexampleofthisconceptisthexrange()function:
functionxrange($start,$end,$step=1){
for($i=$start;$i<=$end;$i+=$step){
yield$i;
}
}
foreach(xrange(1,1000000)as$num){
echo$num,"\n";
}
Thexrange()functionshownaboveprovidesthesamefunctionalityasthebuilt-inrange()function.Theonlydifferenceisthatrange()willreturnanarraywithonemillionnumbersintheabovecase,whereasxrange()returnsaniteratorthatwillemitthesenumbers,butneveractuallycomputeanarraywithallofthem.
Theadvantagesofthisapproachshouldbeevident.Itallowsyoutoworkwithlargedatasetswithoutloadingthemintomemoryallatonce.Youcanevenworkwithinfinitedata-streams.
Allthiscanalsobedonewithoutgenerators,bymanuallyimplementingtheIteratorinterface.Generatorsonlymakeit(alot)moreconvenient,becauseyounolongerhavetoimplementfivedifferentmethodsforeveryiterator.
Generatorsasinterruptiblefunctions
Togofromgeneratorstocoroutinesit’simportanttounderstandhowtheyworkinternally:Generatorsareinterruptiblefunctions,wheretheyieldstatementsconstitutetheinterruptionpoints.
Stickingtotheaboveexample,ifyoucallxrange(1,1000000)nocodeinthexrange()functionisactuallyrun.InsteadPHPjustreturnsaninstanceoftheGeneratorclasswhichimplementstheIteratorinterface:
$range=xrange(1,1000000);
var_dump($range);//object(Generator)#1
var_dump($rangeinstanceofIterator);//bool(true)
Thecodeisonlyrunonceyouinvokeoneoftheiteratormethodsontheobject.E.g.ifyoucall$range->rewind()thecodeinthexrange()functionwillberununtilthefirstoccurrenceofyieldinthecontrolflow.Inthiscaseitmeansthat$i=$startandthenyield$iarerun.Whateverwaspassedtotheyieldstatementcanthenbefetchedusing$range->current().
Tocontinueexecutingthecodeinthegeneratoryouneedtocallthe$range->next()method.Thiswillagainresumethegeneratoruntilayieldstatementishit.Thus,usingasuccessionof->next()and->current()calls,youcangetallvaluesfromthegenerator,untilatsomepointnoyieldishitanymore.Forthexrange()thishappensonce$iexceeds$end.Inthiscasecontrolflowwillreachtheendofthefunction,thusleavingnomorecodetorun.Oncethishappensthe->valid()methodwillreturnfalseandassuchtheiterationends.
Coroutines
Themainthingthatcoroutinesaddtotheabovefunctionalityistheabilitytosendvaluesbacktothegenerator.Thisturnstheone-waycommunicationfromthegeneratortothecallerintoatwo-waychannelbetweenthetwo.
Valuesarepassedintothecoroutinebycallingits->send()methodinsteadof->next().Anexampleofhowthisworksisthefollowinglogger()coroutine:
functionlogger($fileName){
$fileHandle=fopen($fileName,''a'');
while(true){
fwrite($fileHandle,yield."\n");
}
}
$logger=logger(__DIR__.''/log'');
$logger->send(''Foo'');
$logger->send(''Bar'');
Asyoucanseeyieldisn’tusedasastatementhere,butasanexpression,i.e.ithasareturnvalue.Thereturnvalueofyieldiswhateverwaspassedto->send().Inthisexampleyieldwillfirstreturn''Foo''andthen''Bar''.
Theaboveisanexamplewheretheyieldactsasamerereceiver.Itispossibletocombinebothusages,i.e.tobothsendandreceive.Hereisanexampleofhowthisworks:
functiongen(){
$ret=(yield''yield1'');
var_dump($ret);
$ret=(yield''yield2'');
var_dump($ret);
}
$gen=gen();
var_dump($gen->current());//string(6)"yield1"
var_dump($gen->send(''ret1''));//string(4)"ret1"(thefirstvar_dumpingen)
//string(6)"yield2"(thevar_dumpofthe->send()returnvalue)
var_dump($gen->send(''ret2''));//string(4)"ret2"(againfromwithingen)
//NULL(thereturnvalueof->send())
Theexactorderoftheoutputscanbeabithardtounderstandatfirst,somakesurethatyougetwhyitcomesoutinexactlythisway.TherearetwothingsI’dliketoespeciallypointout:First,theuseofparenthesesaroundtheyieldexpressionisnoaccident.Theseparenthesesarerequiredfortechnicalreasons(thoughIhavebeenconsideringaddinganexceptionforassignments,justlikeitexistsinPython).Secondly,youmayhavenoticedthat->current()isusedwithoutcalling->rewind()first.Ifthisisdonethentherewindoperationisperformedimplicitly.
Cooperativemultitasking
Ifreadingtheabovelogger()exampleyouthought“WhywouldIuseacoroutineforthis?Whycan’tIjustuseanormalclass?”,thenyouweretotallyright.Theexampledemonstratesthebasicusage,buttherearen’treallyanyadvantagestousingacoroutineinthiscontext.Thisisthecaseforalotofcoroutineexamples.Asalreadymentionedintheintroductioncoroutinesareaverypowerfulconcept,buttheirapplicationsarerareandoftensufficientlycomplicated,makingithardtocomeupwithsimpleandnon-contrivedexamples.
WhatIdecidedtogoforinthisarticleisanimplementationofcooperativemultitaskingusingcoroutines.Theproblemwe’retryingtosolveisthatyouwanttorunmultipletasks(or“programs”)concurrently.Butaprocessorcanonlyrunonetaskatatime(notconsideringmulti-coreforwww.wang027.comthepurposesofthispost).Thustheprocessorneedstoswitchbetweenthedifferenttasksandalwaysletonerun“foralittlewhile”.
The“cooperative”partofthetermdescribeshowthisswitchingisdone:Itrequiresthatthecurrentlyrunningtaskvoluntarilypassesbackcontroltothescheduler,soitcanrunanothertask.Thisisincontrastto“preemptive”multitaskingwheretheschedulercaninterruptthetaskaftersometimewhetheritlikesitornot.CooperativemultitaskingwasusedinearlyversionsofWindows(preWin95)andMacOS,buttheylaterswitchedtousingpreemption.Thereasonshouldbefairlyobvious:Ifyourelyonaprogramtopassbackcontrolvoluntarily,badly-behavingsoftwarecaneasilyoccupythewholeCPUforitself,notleavingashareforothertasks.
Atthispointyoushouldseetheconnectionbetweencoroutinesandtaskscheduling:Theyieldinstructionprovidesawayforatasktointerruptitselfandpasscontrolbacktothescheduler,soitcanrunsomeothertask.Furthermoretheyieldcanbeusedforcommunicationbetweenthetaskandthescheduler.
Forourpurposesa“task”willbeathinwrapperaroundthecoroutinefunction:
classTask{
protected$taskId;
protected$coroutine;
protected$sendValue=null;
protected$beforeFirstYield=true;
publicfunction__construct($taskId,Generator$coroutine){
$this->taskId=$taskId;
$this->coroutine=$coroutine;
}
publicfunctiongetTaskId(){
return$this->taskId;
}
publicfunctionsetSendValue($sendValue){
$this->sendValue=$sendValue;
}
publicfunctionrun(){
if($this->beforeFirstYield){
$this->beforeFirstYield=false;
return$this->coroutine->current();
}else{
$retval=$this->coroutine->send($this->sendValue);
$this->sendValue=null;
return$retval;
}
}
publicfunctionisFinished(){
return!$this->coroutine->valid();
}
}
AtaskwillbeacoroutinetaggedwithataskID.UsingthesetSendValue()methodyoucanspecifywhichvaluewillbesentintoitonthenextresume(you’llseewhatweneedthisforabitlater).Therun()functionreallydoesnothingmorethancallthesend()methodonthecoroutine.TounderstandwhytheadditionalbeforeFirstYieldflagisneededconsiderthefollowingsnippet:
functiongen(){
yield''foo'';
yield''bar'';
}
$gen=gen();
var_dump($gen->send(''something''));
//Asthesend()happensbeforethefirstyieldthereisanimplicitrewind()call,
//sowhatreallyhappensisthis:
$gen->rewind();
var_dump($gen->send(''something''));
//Therewind()willadvancetothefirstyield(andignoreitsvalue),thesend()will
//advancetothesecondyield(anddumpitsvalue).Thusweloosethefirstyieldedvalue!
ByaddingtheadditionalbeforeFirstYieldconditionwecanensurethatthevalueofthefirstyieldisalsoreturned.
Theschedulernowhastodolittlemorethancyclethroughthetasksandrunthem:
classScheduler{
protected$maxTaskId=0;
protected$taskMap=[];//taskId=>task
protected$taskQueue;
publicfunction__construct(){
$this->taskQueue=newSplQueue();
}
publicfunctionnewTask(Generator$coroutine){
$tid=++$this->maxTaskId;
$task=newTask($tid,$coroutine);
$this->taskMap[$tid]=$task;
$this->schedule($task);
return$tid;
}
publicfunctionschedule(Task$task){
$this->taskQueue->enqueue($task);
}
publicfunctionrun(){
while(!$this->taskQueue->isEmpty()){
$task=$this->taskQueue->dequeue();
$task->run();
if($task->isFinished()){
unset($this->taskMap[$task->getTaskId()]);
}else{
$this->schedule($task);
}
}
}
}
ThenewTask()methodcreatesanewtask(usingthenextfreetaskid)andputsitinthetaskmap.Furthermoreitschedulesthetaskbyputtingitinthetaskqueue.Therun()methodthenwalksthistaskqueueandrunsthetasks.Ifataskisfinisheditisdropped,otherwiseitiswww.shanxiwang.netrescheduledattheendofthequeue.
Letstryouttheschedulerwithtwosimple(andverypointless)tasks:
functiontask1(){
for($i=1;$i<=10;++$i){
echo"Thisistask1iteration$i.\n";
yield;
}
}
functiontask2(){
for($i=1;$i<=5;++$i){
echo"Thisistask2iteration$i.\n";
yield;
}
}
$scheduler=newScheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
Bothtaskswilljustechoamessageandthenpasscontrolbacktotheschedulerwithyield.Thisistheresultingoutput:
Thisistask1iteration1.
Thisistask2iteration1.
Thisistask1iteration2.
Thisistask2iteration2.
Thisistask1iteration3.
Thisistask2iteration3.
Thisistask1iteration4.
Thisistask2iteration4.
Thisistask1iteration5.
Thisistask2iteration5.
Thisistask1iteration6.
Thisistask1iteration7.
Thisistask1iteration8.
Thisistask1iteration9.
Thisistask1iteration10.
Theoutputisexactlyasexpected:Forthefirst5iterationsthetasksalternate,thenthesecondtaskfinishesandonlythefirsttaskcontinuestorun.
Communicatingwiththescheduler
Nowthattheschedulerworkswecanturntothenextpointontheagenda:Communicationbetweenthetasksandthescheduler.Wewillusethesamemethodthatprocessesusetotalktotheoperatingsystem:Throughsystemcalls.Thereasonweneedsyscallsisthattheoperatingsystemisonadifferentprivilegelevelthantheprocesses.Soinordertoperformprivilegedactions(likekillinganotherprocess)therehastobesomewaytopasscontrolbacktothekernel,soitcanperformsaidactions.Internallythisisonceagainimplementedusinginterruptioninstructions.Historicallythegenericintinstructionwasused,nowadaystherearemorespecializedandfastersyscall/sysenterinstructions.
Ourtaskschedulingsystemwillreflectthisdesign:Insteadofsimplypassingtheschedulerintothetask(andthusallowingittodowhateveritwants)wewillcommunicateviasystemcallspassedthroughtheyieldexpression.Theyieldherewillactbothasaninterruptandasawaytopassinformationto(andfrom)thescheduler.
TorepresentasystemcallI’lluseasmallwrapperaroundacallable:
classSystemCall{
protected$callback;
publicfunction__construct(callable$callback){
$this->callback=$callback;
}
publicfunction__invoke(Task$task,Scheduler$scheduler){
$callback=$this->callback;//Can''tcallitdirectlyinPHP:/
return$callback($task,$scheduler);
}
}
Itwillbehavejustlikeanycallable(using__invoke),buttellstheschedulertopassthecallingtaskanditselfintothefunction.Tohandleitwehavetoslightlymodifythescheduler’srunmethod:
publicfunctionrun(){
while(!$this->taskQueue->isEmpty()){
$task=$this->taskQueue->dequeue();
$retval=$task->run();
if($retvalinstanceofSystemCall){
$retval($task,$this);
continue;
}
if($task->isFinished()){
unset($this->taskMap[$task->getTaskId()]);
}else{
$this->schedule($task);
}
}
}
ThefirstsystemcallwilldonothingmorethanreturnthetaskID:
functiongetTaskId(){
returnnewSystemCall(function(Task$task,Scheduler$scheduler){
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
Itdoessobysettingthetidasnextsendvalueandreschedulingthetask.Forsystemcallstheschedulerdoesnotwww.sm136.comautomaticallyreschedulethetask,weneedtodoitmanually(you’llseewhyabitlater).Usingthisnewsyscallwecanrewritethepreviousexample:
functiontask($max){
$tid=(yieldgetTaskId());//<--here''sthesyscall!
for($i=1;$i<=$max;++$i){
echo"Thisistask$tiditeration$i.\n";
yield;
}
}
$scheduler=newScheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
Thiswillgivethesameoutputaswiththepreviousexample.Noticehowthesystemcallisbasicallydonelikeanyothercall,butwithaprependedyield.Twomoresyscallsforcreatingnewtasksandkillingthemagain:
functionnewTask(Generator$coroutine){
returnnewSystemCall(
function(Task$task,Scheduler$scheduler)use($coroutine){
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
functionkillTask($tid){
returnnewSystemCall(
function(Task$task,Scheduler$scheduler)use($tid){
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
ThekillTaskfunctionneedsanadditionalmethodinthescheduler:
publicfunctionkillTask($tid){
if(!isset($this->taskMap[$tid])){
returnfalse;
}
unset($this->taskMap[$tid]);
//Thisisabituglyandcouldbeoptimizedsoitdoesnothavetowalkthequeue,
//butassumingthatkillingtasksisratherrareIwon''tbotherwithitnow
foreach($this->taskQueueas$i=>$task){
if($task->getTaskId()===$tid){
unset($this->taskQueue[$i]);
break;
}
}
returntrue;
}
Asmallscripttotestthenewfunctionality:
functionchildTask(){
$tid=(yieldgetTaskId());
while(true){
echo"Childtask$tidstillalive!\n";
yield;
}
}
functiontask(){
$tid=(yieldgetTaskId());
$childTid=(yieldnewTask(childTask()));
for($i=1;$i<=6;++$i){
echo"Parenttask$tiditeration$i.\n";
yield;
if($i==3)yieldkillTask($childTid);
}
}
$scheduler=newScheduler;
$scheduler->newTask(task());
$scheduler->run();
Thiswillprintthefollowing:
Parenttask1iteration1.
Childtask2stillalive!
Parenttask1iteration2.
Childtask2stillalive!
Parenttask1iteration3.
Childtask2stillalive!
Parenttask1iteration4.
Parenttask1iteration5.
Parenttask1iteration6.
Thechildiskilledafterthreeiterations,sothat’swhenthe“Childisstillalive”messagesend.Oneshouldprobablypointaboutthatthisisnotarealparent/childrelationship,becausethechildcancontinuerunningevenaftertheparentfinished.Orthechildcouldkilltheparent.Onecouldmodifytheschedulertohaveamorehierarchictaskstructure,butIwon’timplementthatinthisarticle.
Therearemanymoreprocessmanagementcallsonecouldimplement,forexamplewait(whichwaitsuntilataskhasfinishedrunning),exec(whichreplacesthecurrenttask)andfork(whichcreatesacloneofthecurrenttask).ForkingisprettycoolandyoucanactuallyimplementitwithPHP’scoroutines,becausetheysupportcloning.
ButI’llleavethesefortheinterestedreader.Insteadletsgettothenexttopic!
Non-BlockingIO
Areallycoolapplicationofourtaskmanagementsystemobviouslyis…awebserver.Therecouldbeonetasklisteningasocketfornewconnectionsandwheneveranewconnectionismadeitwouldcreateanewtaskhandlingthatconnection.
Thehardpartaboutthisisthatnormallysocketoperationslikereadingdataareblocking,i.e.PHPwillwaituntiltheclienthasfinishedsending.Foraweb-serverthat’sobviouslynotgoodatall:Itwouldmeanthatitcanonlyhandleasingleconnectionatatime.
Thesolutionistomakesurethatthesocketis“ready”beforeactuallyreading/writingtoit.Tofindoutwhichsocketsarereadytoreadfromorwritetothestream_selectfunctioncanbeused.
First,letsaddtwonewsyscalls,whichwillcauseatasktowaituntilacertainsocketisready:
functionwaitForRead($socket){
returnnewSystemCall(
function(Task$task,Scheduler$scheduler)use($socket){
$scheduler->waitForRead($socket,$task);
}
);
}
functionwaitForWrite($socket){
returnnewSystemCall(
function(Task$task,Scheduler$scheduler)use($socket){
$scheduler->waitForWrite($socket,$task);
}
);
}
Thesesyscallsarejustproxiestotherespectivemethodsinthescheduler:
//resourceID=>[socket,tasks]
protected$waitingForRead=[];
protected$waitingForWrite=[];
publicfunctionwaitForRead($socket,Task$task){
if(isset($this->waitingForRead[(int)$socket])){
$this->waitingForRead[(int)$socket][1][]=$task;
}else{
$this->waitingForRead[(int)$socket]=[$socket,[$task]];
}
}
publicfunctionwaitForWrite($socket,Task$task){
if(isset($this->waitingForWrite[(int)$socket])){
$this->waitingForWrite[(int)$socket][1][]=$task;
}else{
$this->waitingForWrite[(int)$socket]=[$socket,[$task]];
}
}
ThewaitingForReadandwaitingForWritepropertiesarejustarrayscontainingthesocketstowaitforandthetasksthatarewaitingforthem.Theinterestingpartwww.edu800.cnisthefollowingmethod,whichactuallycheckswhetherthesocketsarereadyandreschedulestherespectivetasks:
protectedfunctionioPoll($timeout){
$rSocks=[];
foreach($this->waitingForReadaslist($socket)){
$rSocks[]=$socket;
}
$wSocks=[];
foreach($this->waitingForWriteaslist($socket)){
$wSocks[]=$socket;
}
$eSocks=[];//dummy
if(!stream_select($rSocks,$wSocks,$eSocks,$timeout)){
return;
}
foreach($rSocksas$socket){
list(,$tasks)=$this->waitingForRead[(int)$socket];
unset($this->waitingForRead[(int)$socket]);
foreach($tasksas$task){
$this->schedule($task);
}
}
foreach($wSocksas$socket){
list(,$tasks)=$this->waitingForWrite[(int)$socket];
unset($this->waitingForWrite[(int)$socket]);
foreach($tasksas$task){
$this->schedule($task);
}
}
}
Thestream_selectfunctiontakesarraysofread,writeandexceptsocketstocheck(we’llignorethatlastcategory).Thearraysarepassedbyreferenceandthefunctionwillonlyleavethoseelementsinthearraysthatchangedstate.Wecanthenwalkoverthosearraysandreschedulealltasksassociatedwiththem.
Inordertoregularlyperformtheabovepollingactionwe’lladdaspecialtaskinthescheduler:
protectedfunctionioPollTask(){
while(true){
if($this->taskQueue->isEmpty()){
$this->ioPoll(null);
}else{
$this->ioPoll(0);
}
yield;
}
}
Thistaskneedstoberegisteredatsomepoint,e.g.onecouldadd$this->newTask($this->ioPollTask())tothestartoftherun()method.Thenitwillworkjustlikeanyothertask,performingthepollingoperationonceeveryfulltaskcycle(thisisn’tnecessarilythebestwaytohandleit).TheioPollTaskwillcallioPollwitha0secondtimeout,whichmeansthatstream_selectwillreturnrightaway(ratherthanwaiting).
Onlyifthetaskqueueisemptyweuseanulltimeout,whichmeansthatitwillwaituntilsomesocketbecomesready.Ifwewouldn’tdothisthepollingtaskwouldjustrunagainandagainandagainuntilanewconnectionismade.Thiswouldresultin100%CPUusage.It’smuchmoreefficienttolettheoperatingsystemdothewaitinginstead.
Writingtheserverisrelativelyeasynow:
functionserver($port){
echo"Startingserveratport$port...\n";
$socket=@stream_socket_server("tcp://localhost:$port",$errNo,$errStr);
if(!$socket)thrownewException($errStr,$errNo);
stream_set_blocking($socket,0);
while(true){
yieldwaitForRead($socket);
$clientSocket=stream_socket_accept($socket,0);
yieldnewTask(handleClient($clientSocket));
}
}
functionhandleClient($socket){
yieldwaitForRead($socket);
$data=fread($socket,8192);
$msg="Receivedfollowingrequest:\n\n$data";
$msgLength=strlen($msg);
$response=<< HTTP/1.1200OK\r
Content-Type:text/plain\r
Content-Length:$msgLength\r
Connection:close\r
\r
$msg
RES;
yieldwaitForWrite($socket);
fwrite($socket,$response);
fclose($socket);
}
$scheduler=newScheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
Thiswillacceptconnectionstolocalhost:8000andjustsendbackaHTTPresponsewithwhateveritwassent.Doinganything“real”wouldbealotmorecomplicated(properlyhandlingHTTPrequestsiswayoutsidethescopeofthisarticle).Theabovesnippetjustdemosthegeneralconcept.
Youcantrytheserveroutusingsomethinglikeab-n10000-c100localhost:8000/.Thiswillsend10000requeststoitwith100ofthemarrivingconcurrently.UsingthesenumbersIgetamedianresponsetimeof10ms.Butthereisanissuewithafewrequestsbeinghandledreallyslowly(like5seconds),that’swhythetotalthroughputisonly2000reqs/s(witha10msresponsetimeitshouldbemorelike10000reqs/s).Withhigherconcurrencycount(e.g.-c500)itmostlystillworkswell,butsomeconnectionswillthrowa“Connectionresetbypeer”error.AsIknowverylittleaboutthislow-levelsocketstuffIdidn’ttrytofigureoutwhattheissueis.
Stackedcoroutines
Ifyouwouldtrytobuildsomelargersystemusingourschedulingsystemyouwouldsoonrunintoaproblem:Weareusedtobreakingupcodeintosmallerfunctionsandcallingthem.Butwithcoroutinesthisisnolongerpossible.E.g.considerthefollowingcode:
functionechoTimes($msg,$max){
for($i=1;$i<=$max;++$i){
echo"$msgiteration$i\n";
yield;
}
}
functiontask(){
echoTimes(''foo'',10);//printfootentimes
echo"---\n";
echoTimes(''bar'',5);//printbarfivetimes
yield;//forceittobeacoroutine
}
$scheduler=newScheduler;
$scheduler->newTask(task());
$scheduler->run();
Thiscodetriestoputtherecurring“outputntimes”codeintoaseparatecoroutineandtheninvokeitfromthemaintask.Butthiswon’twork.Asmentionedattheverybeginningofthisarticlecallingagenerator(orcoroutine)willnotactuallydoanything,itwillonlyreturnanobject.Thisalsohappensintheabovecase.TheechoTimescallswon’tdoanythingthanreturnan(unused)coroutineobject.
Inordertostillallowthisweneedtowriteasmallwrapperaroundthebarecoroutines.I’llcallthisa“stackedcoroutine”becauseitwillmanageastackofnestedcoroutinecalls.Itwillbepossibletocallsub-coroutinesbyyieldingthem:
$retval=(yieldsomeCoroutine($foo,$bar));
Thesubcoroutineswillalsobeabletoreturnavalue,againbyusingyield:
yieldretval("I''mareturnvalue!");
Theretvalfunctiondoesnothingmorethanreturningawrapperaroundthevaluewhichwillsignalthatit’sareturnvalue:
classCoroutineReturnValue{
protected$value;
publicfunction__construct($value){
$this->value=$value;
}
publicfunctiongetValue(){
return$this->value;
}
}
functionretval($value){
returnnewCoroutineReturnValue($value);
}
Inordertoturnacoroutineintoastackedcoroutine(whichsupportssubcalls)we’llhavetowriteanotherfunction(whichisobviouslyyet-another-coroutine):
functionstackedCoroutine(Generator$gen){
$stack=newSplStack;
for(;;){
$value=$gen->current();
if($valueinstanceofGenerator){
$stack->push($gen);
$gen=$value;
continue;
}
$isReturnValue=$valueinstanceofCoroutineReturnValue;
if(!$gen->valid()||$isReturnValue){
if($stack->isEmpty()){
return;
}
$gen=$stack->pop();
$gen->send($isReturnValue?$value->getValue():NULL);
continue;
}
$gen->send(yield$gen->key()=>$value);
}
}
Thisfunctionactsasasimpleproxybetweenthecallerandthecurrentlyrunningsubcoroutine.Thisiswww.hunanwang.nethandledinthe$gen->send(yield$gen->key()=>$value);line.Additionallyitcheckswhetherareturnvalueisagenerator,inwhichcaseitwillstartrunningitandpushesthepreviouscoroutineonthestack.OnceitgetsaCoroutineReturnValueitwillpopthestackagainandcontinueexecutingthepreviouscoroutine.
Inordertomakethestackedcoroutinesusableintasksthe$this->coroutine=$coroutine;lineintheTaskconstructorneedstobereplacedwith$this->coroutine=stackedCoroutine($coroutine);.
Nowwecanimprovethewebserverexamplefromaboveabitbygroupingthewait+read(andwait+writeandwait+accept)actionsintofunctions.TogrouptherelatedfunctionalityI’lluseaclass:
classCoSocket{
protected$socket;
publicfunction__construct($socket){
$this->socket=$socket;
}
publicfunctionaccept(){
yieldwaitForRead($this->socket);
yieldretval(newCoSocket(stream_socket_accept($this->socket,0)));
}
publicfunctionread($size){
yieldwaitForRead($this->socket);
yieldretval(fread($this->socket,$size));
}
publicfunctionwrite($string){
yieldwaitForWrite($this->socket);
fwrite($this->socket,$string);
}
publicfunctionclose(){
@fclose($this->socket);
}
}
Nowtheservercanberewrittenabitcleaner:
functionserver($port){
echo"Startingserveratport$port...\n";
$socket=@stream_socket_server("tcp://localhost:$port",$errNo,$errStr);
if(!$socket)thrownewException($errStr,$errNo);
stream_set_blocking($socket,0);
$socket=newCoSocket($socket);
while(true){
yieldnewTask(
handleClient(yield$socket->accept())
);
}
}
functionhandleClient($socket){
$data=(yield$socket->read(8192));
$msg="Receivedfollowingrequest:\n\n$data";
$msgLength=strlen($msg);
$response=<< HTTP/1.1200OK\r
Content-Type:text/plain\r
Content-Length:$msgLength\r
Connection:close\r
\r
$msg
RES;
yield$socket->write($response);
yield$socket->close();
}
Errorhandling
Asagoodprogrammeryouobviouslynoticedthattheaboveexamplesalllackerrorhandling.Prettymucheverysocketoperationisfallibleandcanproduceerrors.Iobviouslydidthisbecauseerrorhandlingisreallytedious(especiallyforsockets!)andwouldeasilyblowupthecodesizebyafewfactors.
ButstillI’dliketocoverhowerrorhandlingforcoroutinesworksingeneral:Coroutinesprovidetheabilitytothrowexceptionsinsidethemusingthethrow()method.AsofthiswritingthismethoddoesnotyetexistinPHP’simplementation,butIwillcommititlatertoday.
Thethrow()methodtakesanexceptionandthrowsitatthecurrentsuspensionpointinthecoroutine.Considerthiscode:
functiongen(){
echo"Foo\n";
try{
yield;
}catch(Exception$e){
echo"Exception:{$e->getMessage()}\n";
}
echo"Bar\n";
}
$gen=gen();
$gen->rewind();//echos"Foo"
$gen->throw(newException(''Test''));//echos"Exception:Test"
//and"Bar"
Thisisreallyawesomeforourpurposes,becausewecanmakesystemwww.visa158.comcallsandsubcoroutinecallsthrowexceptions.ForthesystemcallstheScheduler::run()methodneedsasmalladjustment:
if($retvalinstanceofSystemCall){
try{
$retval($task,$this);
}catch(Exception$e){
$task->setException($e);
$this->schedule($task);
}
continue;
}
AndtheTaskclassneedstohandlethrowcallstoo:
classTask{
//...
protected$exception=null;
publicfunctionsetException($exception){
$this->exception=$exception;
}
publicfunctionrun(){
if($this->beforeFirstYield){
$this->beforeFirstYield=false;
return$this->coroutine->current();
}elseif($this->exception){
$retval=$this->coroutine->throw($this->exception);
$this->exception=null;
return$retval;
}else{
$retval=$this->coroutine->send($this->sendValue);
$this->sendValue=null;
return$retval;
}
}
//...
}
Nowwecanstartthrowingexceptionsfromsystemcalls!E.g.forthekillTaskcall,letsthrowanexceptionifthepassedtaskIDisinvalid:
functionkillTask($tid){
returnnewSystemCall(
function(Task$task,Scheduler$scheduler)use($tid){
if($scheduler->killTask($tid)){
$scheduler->schedule($task);
}else{
thrownewInvalidArgumentException(''InvalidtaskID!'');
}
}
);
}
Tryitout:
functiontask(){
try{
yieldkillTask(500);
}catch(Exception$e){
echo''Triedtokilltask500butfailed:'',$e->getMessage(),"\n";
}
}
Sadlythiswon’tworkproperlyyet,becausethestackedCoroutinefunctiondoesn’thandletheexceptioncorrectly.Tofixitthefunctionneedssomemodifications:
functionstackedCoroutine(Generator$gen){
$stack=newSplStack;
$exception=null;
for(;;){
try{
if($exception){
$gen->throw($exception);
$exception=null;
continue;
}
$value=$gen->current();
if($valueinstanceofGenerator){
$stack->push($gen);
$gen=$value;
continue;
}
$isReturnValue=$valueinstanceofCoroutineReturnValue;
if(!$gen->valid()||$isReturnValue){
if($stack->isEmpty()){
return;
}
$gen=$stack->pop();
$gen->send($isReturnValue?$value->getValue():NULL);
continue;
}
try{
$sendValue=(yield$gen->key()=>$value);
}catch(Exception$e){
$gen->throw($e);
continue;
}
$gen->send($sendValue);
}catch(Exception$e){
if($stack->isEmpty()){
throw$e;
}
$gen=$stack->pop();
$exception=$e;
}
}
}
Wrappingup
Inthisarticlewebuiltataskschedulerusingcooperativemultitasking,includingtheabilitytoperform“systemcalls”,doingnon-blockingIOoperationsandhandlingerrors.Thereallycoolthingaboutallthisisthattheresultingcodeforthetaskslookstotallysynchronous,eventhoughitisperformingalotofasynchronousoperations.Ifyouwanttoreaddatafromasocketyoudon’thavetopasssomecallbackorregisteraneventlistener.Insteadyouwriteyield$socket->read().Whichisbasicallywhatyouwouldnormallydotoo,justwithayieldinfrontofit.
WhenIfirstheardaboutallthisIfoundthisconcepttotallyawesomeandthat’swhatmotivatedmetoimplementitinPHP.AtthesametimeIfindcoroutinesreallyscary.ThereisathinlinebetweenawesomecodeandatotalmessandIthinkcoroutinessitexactlyonthatline.It’shardformetosaywhetherwritingasynccodeinthewayoutlinedaboveisreallybeneficial.
Inanycase,Ithinkit’saninterestingtopicandIhopeyoufounditinterestingtoo.Commentswelcome:)
Ifyoulikedthisarticle,youmaywanttobrowsemyotherarticlesorfollowmeonTwitter.
blogcommentspoweredbyDisqus
|
|