配色: 字号:
简单介绍PHP非阻塞模式
2016-08-26 | 阅:  转:  |  分享 
  
简单介绍PHP非阻塞模式

让PHP不再阻塞当PHP作为后端处理需要完成一些长时间处理,为了快速响应页面请求,不作结果返回判断的情况下,可以有如下措施:

一、若你使用的是FastCGI模式,使用fastcgi_finish_request()能马上结束会话,但PHP线程继续在跑。

?

1

2

3

4

5

6

7

8 echo"programstart.";

file_put_contents(''log.txt'',''start-time:''.date(''Y-m-dH:i:s''),FILE_APPEND);

fastcgi_finish_request();

sleep(1);

echo''debug...'';

file_put_contents(''log.txt'',''start-proceed:''.date(''Y-m-dH:i:s''),FILE_APPEND);

sleep(10);

file_put_contents(''log.txt'',''end-time:''.date(''Y-m-dH:i:s''),FILE_APPEND); 这个例子输出结果可看到输出programstart.后会话就返回了,所以debug那个输出浏览器是接收不到的,而log.txt文件能完整接收到三个完成时间。

二、使用fsockopen、cUrl的非阻塞模式请求另外的网址

?

1

2

3

4

5

6

7

8 $fp=fsockopen("www.example.com",80,$errno,$errstr,30);

if(!$fp)die(''errorfsockopen'');

stream_set_blocking($fp,0);

$http="GET/save.php/HTTP/1.1\r\n";?

$http.="Host:www.example.com\r\n";?

$http.="Connection:Close\r\n\r\n";

fwrite($fp,$http);

fclose($fp); 利用cURL中的curl_multi_函数发送异步请求

?

1

2

3

4

5

6 $cmh=curl_multi_init();

$ch1=curl_init();

curl_setopt($ch1,CURLOPT_URL,"http://localhost:6666/child.php");

curl_multi_add_handle($cmh,$ch1);

curl_multi_exec($cmh,$active);

echo"End\n"; 三、使用Gearman、Swoole扩展

Gearman是一个具有php扩展的分布式异步处理框架,能处理大批量异步任务;Swoole最近很火,有很多异步方法,使用简单。(尘缘注:号称重新定义PHP,把NodeJS喷得体无完肤。Swoole工具虽好,却感觉是扩展本身跟NodeJS没可比性)

四、使用redis等缓存、队列,将数据写入缓存,使用后台计划任务实现数据异步处理。

这个方法在常见的大流量架构中应该很常见吧

五、极端的情况下,可以调用系统命令,可以将数据传给后台任务执行,个人感觉不是很高效。

?

1

2 $cmd=''nohupphp./processd.php$someVar>/dev/null&'';

`$cmd` 六、外国佬的大招,没看懂,php原生支持

http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html

七、安装pcntl扩展,使用pcntl_fork生成子进程异步执行任务,个人觉得是最方便的,但也容易出现zombieprocess。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14 if(($pid=pcntl_fork())==0){

child_func();//子进程函数,主进程运行

}else{

father_func();//主进程函数

}

echo"Process".getmypid()."gettotheend.\n";

functionfather_func(){

echo"Fatherpidis".getmypid()."\n";

}

functionchild_func(){

sleep(6);

echo"Childprocessexitpidis".getmypid()."\n";

exit(0);

} 原文:Cooperativemultitaskingusingcoroutines(inPHP!)22.December2012

OneofthelargenewfeaturesinPHP5.5willbesupportforgeneratorsandcoroutines.Generatorsarealreadysufficientlycoveredbythedocumentationandvariousotherblogposts(likethisoneorthisone.Coroutinesontheotherhandhavereceivedrelativelylittleattention.Thereasonisthatcoroutinesarebothalotmorepowerfulandalothardertounderstandandexplain.



InthisarticleI’dliketoguideyouthroughanimplementationofataskschedulerusingcoroutines,soyoucangetafeelingforthestuffthattheyallowyoutodo.I’llstartoffwithafewintroductorysections.Ifyoufeellikeyoualreadygotagoodgraspofthebasicsbehindgeneratorsandcoroutines,thenyoucanjumpstraighttothe“Cooperativemultitasking”section.



Generators

Thebasicideabehindgeneratorsisthatafunctiondoesn’treturnasinglevalue,butreturnsasequenceofvaluesinstead,whereeveryvalueisemittedwww.baiyuewang.netonebyone.Orinotherwords,generatorsallowyoutoimplementiteratorsmoreeasily.Averysimpleexampleofthisconceptisthexrange()function:



functionxrange($start,$end,$step=1){

for($i=$start;$i<=$end;$i+=$step){

yield$i;

}

}



foreach(xrange(1,1000000)as$num){

echo$num,"\n";

}

Thexrange()functionshownaboveprovidesthesamefunctionalityasthebuilt-inrange()function.Theonlydifferenceisthatrange()willreturnanarraywithonemillionnumbersintheabovecase,whereasxrange()returnsaniteratorthatwillemitthesenumbers,butneveractuallycomputeanarraywithallofthem.



Theadvantagesofthisapproachshouldbeevident.Itallowsyoutoworkwithlargedatasetswithoutloadingthemintomemoryallatonce.Youcanevenworkwithinfinitedata-streams.



Allthiscanalsobedonewithoutgenerators,bymanuallyimplementingtheIteratorinterface.Generatorsonlymakeit(alot)moreconvenient,becauseyounolongerhavetoimplementfivedifferentmethodsforeveryiterator.



Generatorsasinterruptiblefunctions

Togofromgeneratorstocoroutinesit’simportanttounderstandhowtheyworkinternally:Generatorsareinterruptiblefunctions,wheretheyieldstatementsconstitutetheinterruptionpoints.



Stickingtotheaboveexample,ifyoucallxrange(1,1000000)nocodeinthexrange()functionisactuallyrun.InsteadPHPjustreturnsaninstanceoftheGeneratorclasswhichimplementstheIteratorinterface:



$range=xrange(1,1000000);

var_dump($range);//object(Generator)#1

var_dump($rangeinstanceofIterator);//bool(true)

Thecodeisonlyrunonceyouinvokeoneoftheiteratormethodsontheobject.E.g.ifyoucall$range->rewind()thecodeinthexrange()functionwillberununtilthefirstoccurrenceofyieldinthecontrolflow.Inthiscaseitmeansthat$i=$startandthenyield$iarerun.Whateverwaspassedtotheyieldstatementcanthenbefetchedusing$range->current().



Tocontinueexecutingthecodeinthegeneratoryouneedtocallthe$range->next()method.Thiswillagainresumethegeneratoruntilayieldstatementishit.Thus,usingasuccessionof->next()and->current()calls,youcangetallvaluesfromthegenerator,untilatsomepointnoyieldishitanymore.Forthexrange()thishappensonce$iexceeds$end.Inthiscasecontrolflowwillreachtheendofthefunction,thusleavingnomorecodetorun.Oncethishappensthe->valid()methodwillreturnfalseandassuchtheiterationends.



Coroutines

Themainthingthatcoroutinesaddtotheabovefunctionalityistheabilitytosendvaluesbacktothegenerator.Thisturnstheone-waycommunicationfromthegeneratortothecallerintoatwo-waychannelbetweenthetwo.



Valuesarepassedintothecoroutinebycallingits->send()methodinsteadof->next().Anexampleofhowthisworksisthefollowinglogger()coroutine:



functionlogger($fileName){

$fileHandle=fopen($fileName,''a'');

while(true){

fwrite($fileHandle,yield."\n");

}

}



$logger=logger(__DIR__.''/log'');

$logger->send(''Foo'');

$logger->send(''Bar'');

Asyoucanseeyieldisn’tusedasastatementhere,butasanexpression,i.e.ithasareturnvalue.Thereturnvalueofyieldiswhateverwaspassedto->send().Inthisexampleyieldwillfirstreturn''Foo''andthen''Bar''.



Theaboveisanexamplewheretheyieldactsasamerereceiver.Itispossibletocombinebothusages,i.e.tobothsendandreceive.Hereisanexampleofhowthisworks:



functiongen(){

$ret=(yield''yield1'');

var_dump($ret);

$ret=(yield''yield2'');

var_dump($ret);

}



$gen=gen();

var_dump($gen->current());//string(6)"yield1"

var_dump($gen->send(''ret1''));//string(4)"ret1"(thefirstvar_dumpingen)

//string(6)"yield2"(thevar_dumpofthe->send()returnvalue)

var_dump($gen->send(''ret2''));//string(4)"ret2"(againfromwithingen)

//NULL(thereturnvalueof->send())

Theexactorderoftheoutputscanbeabithardtounderstandatfirst,somakesurethatyougetwhyitcomesoutinexactlythisway.TherearetwothingsI’dliketoespeciallypointout:First,theuseofparenthesesaroundtheyieldexpressionisnoaccident.Theseparenthesesarerequiredfortechnicalreasons(thoughIhavebeenconsideringaddinganexceptionforassignments,justlikeitexistsinPython).Secondly,youmayhavenoticedthat->current()isusedwithoutcalling->rewind()first.Ifthisisdonethentherewindoperationisperformedimplicitly.



Cooperativemultitasking

Ifreadingtheabovelogger()exampleyouthought“WhywouldIuseacoroutineforthis?Whycan’tIjustuseanormalclass?”,thenyouweretotallyright.Theexampledemonstratesthebasicusage,buttherearen’treallyanyadvantagestousingacoroutineinthiscontext.Thisisthecaseforalotofcoroutineexamples.Asalreadymentionedintheintroductioncoroutinesareaverypowerfulconcept,buttheirapplicationsarerareandoftensufficientlycomplicated,makingithardtocomeupwithsimpleandnon-contrivedexamples.



WhatIdecidedtogoforinthisarticleisanimplementationofcooperativemultitaskingusingcoroutines.Theproblemwe’retryingtosolveisthatyouwanttorunmultipletasks(or“programs”)concurrently.Butaprocessorcanonlyrunonetaskatatime(notconsideringmulti-coreforwww.wang027.comthepurposesofthispost).Thustheprocessorneedstoswitchbetweenthedifferenttasksandalwaysletonerun“foralittlewhile”.



The“cooperative”partofthetermdescribeshowthisswitchingisdone:Itrequiresthatthecurrentlyrunningtaskvoluntarilypassesbackcontroltothescheduler,soitcanrunanothertask.Thisisincontrastto“preemptive”multitaskingwheretheschedulercaninterruptthetaskaftersometimewhetheritlikesitornot.CooperativemultitaskingwasusedinearlyversionsofWindows(preWin95)andMacOS,buttheylaterswitchedtousingpreemption.Thereasonshouldbefairlyobvious:Ifyourelyonaprogramtopassbackcontrolvoluntarily,badly-behavingsoftwarecaneasilyoccupythewholeCPUforitself,notleavingashareforothertasks.



Atthispointyoushouldseetheconnectionbetweencoroutinesandtaskscheduling:Theyieldinstructionprovidesawayforatasktointerruptitselfandpasscontrolbacktothescheduler,soitcanrunsomeothertask.Furthermoretheyieldcanbeusedforcommunicationbetweenthetaskandthescheduler.



Forourpurposesa“task”willbeathinwrapperaroundthecoroutinefunction:



classTask{

protected$taskId;

protected$coroutine;

protected$sendValue=null;

protected$beforeFirstYield=true;



publicfunction__construct($taskId,Generator$coroutine){

$this->taskId=$taskId;

$this->coroutine=$coroutine;

}



publicfunctiongetTaskId(){

return$this->taskId;

}



publicfunctionsetSendValue($sendValue){

$this->sendValue=$sendValue;

}



publicfunctionrun(){

if($this->beforeFirstYield){

$this->beforeFirstYield=false;

return$this->coroutine->current();

}else{

$retval=$this->coroutine->send($this->sendValue);

$this->sendValue=null;

return$retval;

}

}



publicfunctionisFinished(){

return!$this->coroutine->valid();

}

}

AtaskwillbeacoroutinetaggedwithataskID.UsingthesetSendValue()methodyoucanspecifywhichvaluewillbesentintoitonthenextresume(you’llseewhatweneedthisforabitlater).Therun()functionreallydoesnothingmorethancallthesend()methodonthecoroutine.TounderstandwhytheadditionalbeforeFirstYieldflagisneededconsiderthefollowingsnippet:



functiongen(){

yield''foo'';

yield''bar'';

}



$gen=gen();

var_dump($gen->send(''something''));



//Asthesend()happensbeforethefirstyieldthereisanimplicitrewind()call,

//sowhatreallyhappensisthis:

$gen->rewind();

var_dump($gen->send(''something''));



//Therewind()willadvancetothefirstyield(andignoreitsvalue),thesend()will

//advancetothesecondyield(anddumpitsvalue).Thusweloosethefirstyieldedvalue!

ByaddingtheadditionalbeforeFirstYieldconditionwecanensurethatthevalueofthefirstyieldisalsoreturned.



Theschedulernowhastodolittlemorethancyclethroughthetasksandrunthem:



classScheduler{

protected$maxTaskId=0;

protected$taskMap=[];//taskId=>task

protected$taskQueue;



publicfunction__construct(){

$this->taskQueue=newSplQueue();

}



publicfunctionnewTask(Generator$coroutine){

$tid=++$this->maxTaskId;

$task=newTask($tid,$coroutine);

$this->taskMap[$tid]=$task;

$this->schedule($task);

return$tid;

}



publicfunctionschedule(Task$task){

$this->taskQueue->enqueue($task);

}



publicfunctionrun(){

while(!$this->taskQueue->isEmpty()){

$task=$this->taskQueue->dequeue();

$task->run();



if($task->isFinished()){

unset($this->taskMap[$task->getTaskId()]);

}else{

$this->schedule($task);

}

}

}

}

ThenewTask()methodcreatesanewtask(usingthenextfreetaskid)andputsitinthetaskmap.Furthermoreitschedulesthetaskbyputtingitinthetaskqueue.Therun()methodthenwalksthistaskqueueandrunsthetasks.Ifataskisfinisheditisdropped,otherwiseitiswww.shanxiwang.netrescheduledattheendofthequeue.



Letstryouttheschedulerwithtwosimple(andverypointless)tasks:



functiontask1(){

for($i=1;$i<=10;++$i){

echo"Thisistask1iteration$i.\n";

yield;

}

}



functiontask2(){

for($i=1;$i<=5;++$i){

echo"Thisistask2iteration$i.\n";

yield;

}

}



$scheduler=newScheduler;



$scheduler->newTask(task1());

$scheduler->newTask(task2());



$scheduler->run();

Bothtaskswilljustechoamessageandthenpasscontrolbacktotheschedulerwithyield.Thisistheresultingoutput:



Thisistask1iteration1.

Thisistask2iteration1.

Thisistask1iteration2.

Thisistask2iteration2.

Thisistask1iteration3.

Thisistask2iteration3.

Thisistask1iteration4.

Thisistask2iteration4.

Thisistask1iteration5.

Thisistask2iteration5.

Thisistask1iteration6.

Thisistask1iteration7.

Thisistask1iteration8.

Thisistask1iteration9.

Thisistask1iteration10.

Theoutputisexactlyasexpected:Forthefirst5iterationsthetasksalternate,thenthesecondtaskfinishesandonlythefirsttaskcontinuestorun.



Communicatingwiththescheduler

Nowthattheschedulerworkswecanturntothenextpointontheagenda:Communicationbetweenthetasksandthescheduler.Wewillusethesamemethodthatprocessesusetotalktotheoperatingsystem:Throughsystemcalls.Thereasonweneedsyscallsisthattheoperatingsystemisonadifferentprivilegelevelthantheprocesses.Soinordertoperformprivilegedactions(likekillinganotherprocess)therehastobesomewaytopasscontrolbacktothekernel,soitcanperformsaidactions.Internallythisisonceagainimplementedusinginterruptioninstructions.Historicallythegenericintinstructionwasused,nowadaystherearemorespecializedandfastersyscall/sysenterinstructions.



Ourtaskschedulingsystemwillreflectthisdesign:Insteadofsimplypassingtheschedulerintothetask(andthusallowingittodowhateveritwants)wewillcommunicateviasystemcallspassedthroughtheyieldexpression.Theyieldherewillactbothasaninterruptandasawaytopassinformationto(andfrom)thescheduler.



TorepresentasystemcallI’lluseasmallwrapperaroundacallable:



classSystemCall{

protected$callback;



publicfunction__construct(callable$callback){

$this->callback=$callback;

}



publicfunction__invoke(Task$task,Scheduler$scheduler){

$callback=$this->callback;//Can''tcallitdirectlyinPHP:/

return$callback($task,$scheduler);

}

}

Itwillbehavejustlikeanycallable(using__invoke),buttellstheschedulertopassthecallingtaskanditselfintothefunction.Tohandleitwehavetoslightlymodifythescheduler’srunmethod:



publicfunctionrun(){

while(!$this->taskQueue->isEmpty()){

$task=$this->taskQueue->dequeue();

$retval=$task->run();



if($retvalinstanceofSystemCall){

$retval($task,$this);

continue;

}



if($task->isFinished()){

unset($this->taskMap[$task->getTaskId()]);

}else{

$this->schedule($task);

}

}

}

ThefirstsystemcallwilldonothingmorethanreturnthetaskID:



functiongetTaskId(){

returnnewSystemCall(function(Task$task,Scheduler$scheduler){

$task->setSendValue($task->getTaskId());

$scheduler->schedule($task);

});

}

Itdoessobysettingthetidasnextsendvalueandreschedulingthetask.Forsystemcallstheschedulerdoesnotwww.sm136.comautomaticallyreschedulethetask,weneedtodoitmanually(you’llseewhyabitlater).Usingthisnewsyscallwecanrewritethepreviousexample:



functiontask($max){

$tid=(yieldgetTaskId());//<--here''sthesyscall!

for($i=1;$i<=$max;++$i){

echo"Thisistask$tiditeration$i.\n";

yield;

}

}



$scheduler=newScheduler;



$scheduler->newTask(task(10));

$scheduler->newTask(task(5));



$scheduler->run();

Thiswillgivethesameoutputaswiththepreviousexample.Noticehowthesystemcallisbasicallydonelikeanyothercall,butwithaprependedyield.Twomoresyscallsforcreatingnewtasksandkillingthemagain:



functionnewTask(Generator$coroutine){

returnnewSystemCall(

function(Task$task,Scheduler$scheduler)use($coroutine){

$task->setSendValue($scheduler->newTask($coroutine));

$scheduler->schedule($task);

}

);

}



functionkillTask($tid){

returnnewSystemCall(

function(Task$task,Scheduler$scheduler)use($tid){

$task->setSendValue($scheduler->killTask($tid));

$scheduler->schedule($task);

}

);

}

ThekillTaskfunctionneedsanadditionalmethodinthescheduler:



publicfunctionkillTask($tid){

if(!isset($this->taskMap[$tid])){

returnfalse;

}



unset($this->taskMap[$tid]);



//Thisisabituglyandcouldbeoptimizedsoitdoesnothavetowalkthequeue,

//butassumingthatkillingtasksisratherrareIwon''tbotherwithitnow

foreach($this->taskQueueas$i=>$task){

if($task->getTaskId()===$tid){

unset($this->taskQueue[$i]);

break;

}

}



returntrue;

}

Asmallscripttotestthenewfunctionality:



functionchildTask(){

$tid=(yieldgetTaskId());

while(true){

echo"Childtask$tidstillalive!\n";

yield;

}

}



functiontask(){

$tid=(yieldgetTaskId());

$childTid=(yieldnewTask(childTask()));



for($i=1;$i<=6;++$i){

echo"Parenttask$tiditeration$i.\n";

yield;



if($i==3)yieldkillTask($childTid);

}

}



$scheduler=newScheduler;

$scheduler->newTask(task());

$scheduler->run();

Thiswillprintthefollowing:



Parenttask1iteration1.

Childtask2stillalive!

Parenttask1iteration2.

Childtask2stillalive!

Parenttask1iteration3.

Childtask2stillalive!

Parenttask1iteration4.

Parenttask1iteration5.

Parenttask1iteration6.

Thechildiskilledafterthreeiterations,sothat’swhenthe“Childisstillalive”messagesend.Oneshouldprobablypointaboutthatthisisnotarealparent/childrelationship,becausethechildcancontinuerunningevenaftertheparentfinished.Orthechildcouldkilltheparent.Onecouldmodifytheschedulertohaveamorehierarchictaskstructure,butIwon’timplementthatinthisarticle.



Therearemanymoreprocessmanagementcallsonecouldimplement,forexamplewait(whichwaitsuntilataskhasfinishedrunning),exec(whichreplacesthecurrenttask)andfork(whichcreatesacloneofthecurrenttask).ForkingisprettycoolandyoucanactuallyimplementitwithPHP’scoroutines,becausetheysupportcloning.



ButI’llleavethesefortheinterestedreader.Insteadletsgettothenexttopic!



Non-BlockingIO

Areallycoolapplicationofourtaskmanagementsystemobviouslyis…awebserver.Therecouldbeonetasklisteningasocketfornewconnectionsandwheneveranewconnectionismadeitwouldcreateanewtaskhandlingthatconnection.



Thehardpartaboutthisisthatnormallysocketoperationslikereadingdataareblocking,i.e.PHPwillwaituntiltheclienthasfinishedsending.Foraweb-serverthat’sobviouslynotgoodatall:Itwouldmeanthatitcanonlyhandleasingleconnectionatatime.



Thesolutionistomakesurethatthesocketis“ready”beforeactuallyreading/writingtoit.Tofindoutwhichsocketsarereadytoreadfromorwritetothestream_selectfunctioncanbeused.



First,letsaddtwonewsyscalls,whichwillcauseatasktowaituntilacertainsocketisready:



functionwaitForRead($socket){

returnnewSystemCall(

function(Task$task,Scheduler$scheduler)use($socket){

$scheduler->waitForRead($socket,$task);

}

);

}



functionwaitForWrite($socket){

returnnewSystemCall(

function(Task$task,Scheduler$scheduler)use($socket){

$scheduler->waitForWrite($socket,$task);

}

);

}

Thesesyscallsarejustproxiestotherespectivemethodsinthescheduler:



//resourceID=>[socket,tasks]

protected$waitingForRead=[];

protected$waitingForWrite=[];



publicfunctionwaitForRead($socket,Task$task){

if(isset($this->waitingForRead[(int)$socket])){

$this->waitingForRead[(int)$socket][1][]=$task;

}else{

$this->waitingForRead[(int)$socket]=[$socket,[$task]];

}

}



publicfunctionwaitForWrite($socket,Task$task){

if(isset($this->waitingForWrite[(int)$socket])){

$this->waitingForWrite[(int)$socket][1][]=$task;

}else{

$this->waitingForWrite[(int)$socket]=[$socket,[$task]];

}

}

ThewaitingForReadandwaitingForWritepropertiesarejustarrayscontainingthesocketstowaitforandthetasksthatarewaitingforthem.Theinterestingpartwww.edu800.cnisthefollowingmethod,whichactuallycheckswhetherthesocketsarereadyandreschedulestherespectivetasks:



protectedfunctionioPoll($timeout){

$rSocks=[];

foreach($this->waitingForReadaslist($socket)){

$rSocks[]=$socket;

}



$wSocks=[];

foreach($this->waitingForWriteaslist($socket)){

$wSocks[]=$socket;

}



$eSocks=[];//dummy



if(!stream_select($rSocks,$wSocks,$eSocks,$timeout)){

return;

}



foreach($rSocksas$socket){

list(,$tasks)=$this->waitingForRead[(int)$socket];

unset($this->waitingForRead[(int)$socket]);



foreach($tasksas$task){

$this->schedule($task);

}

}



foreach($wSocksas$socket){

list(,$tasks)=$this->waitingForWrite[(int)$socket];

unset($this->waitingForWrite[(int)$socket]);



foreach($tasksas$task){

$this->schedule($task);

}

}

}

Thestream_selectfunctiontakesarraysofread,writeandexceptsocketstocheck(we’llignorethatlastcategory).Thearraysarepassedbyreferenceandthefunctionwillonlyleavethoseelementsinthearraysthatchangedstate.Wecanthenwalkoverthosearraysandreschedulealltasksassociatedwiththem.



Inordertoregularlyperformtheabovepollingactionwe’lladdaspecialtaskinthescheduler:



protectedfunctionioPollTask(){

while(true){

if($this->taskQueue->isEmpty()){

$this->ioPoll(null);

}else{

$this->ioPoll(0);

}

yield;

}

}

Thistaskneedstoberegisteredatsomepoint,e.g.onecouldadd$this->newTask($this->ioPollTask())tothestartoftherun()method.Thenitwillworkjustlikeanyothertask,performingthepollingoperationonceeveryfulltaskcycle(thisisn’tnecessarilythebestwaytohandleit).TheioPollTaskwillcallioPollwitha0secondtimeout,whichmeansthatstream_selectwillreturnrightaway(ratherthanwaiting).



Onlyifthetaskqueueisemptyweuseanulltimeout,whichmeansthatitwillwaituntilsomesocketbecomesready.Ifwewouldn’tdothisthepollingtaskwouldjustrunagainandagainandagainuntilanewconnectionismade.Thiswouldresultin100%CPUusage.It’smuchmoreefficienttolettheoperatingsystemdothewaitinginstead.



Writingtheserverisrelativelyeasynow:



functionserver($port){

echo"Startingserveratport$port...\n";



$socket=@stream_socket_server("tcp://localhost:$port",$errNo,$errStr);

if(!$socket)thrownewException($errStr,$errNo);



stream_set_blocking($socket,0);



while(true){

yieldwaitForRead($socket);

$clientSocket=stream_socket_accept($socket,0);

yieldnewTask(handleClient($clientSocket));

}

}



functionhandleClient($socket){

yieldwaitForRead($socket);

$data=fread($socket,8192);



$msg="Receivedfollowingrequest:\n\n$data";

$msgLength=strlen($msg);



$response=<<
HTTP/1.1200OK\r

Content-Type:text/plain\r

Content-Length:$msgLength\r

Connection:close\r

\r

$msg

RES;



yieldwaitForWrite($socket);

fwrite($socket,$response);



fclose($socket);

}



$scheduler=newScheduler;

$scheduler->newTask(server(8000));

$scheduler->run();

Thiswillacceptconnectionstolocalhost:8000andjustsendbackaHTTPresponsewithwhateveritwassent.Doinganything“real”wouldbealotmorecomplicated(properlyhandlingHTTPrequestsiswayoutsidethescopeofthisarticle).Theabovesnippetjustdemosthegeneralconcept.



Youcantrytheserveroutusingsomethinglikeab-n10000-c100localhost:8000/.Thiswillsend10000requeststoitwith100ofthemarrivingconcurrently.UsingthesenumbersIgetamedianresponsetimeof10ms.Butthereisanissuewithafewrequestsbeinghandledreallyslowly(like5seconds),that’swhythetotalthroughputisonly2000reqs/s(witha10msresponsetimeitshouldbemorelike10000reqs/s).Withhigherconcurrencycount(e.g.-c500)itmostlystillworkswell,butsomeconnectionswillthrowa“Connectionresetbypeer”error.AsIknowverylittleaboutthislow-levelsocketstuffIdidn’ttrytofigureoutwhattheissueis.



Stackedcoroutines

Ifyouwouldtrytobuildsomelargersystemusingourschedulingsystemyouwouldsoonrunintoaproblem:Weareusedtobreakingupcodeintosmallerfunctionsandcallingthem.Butwithcoroutinesthisisnolongerpossible.E.g.considerthefollowingcode:



functionechoTimes($msg,$max){

for($i=1;$i<=$max;++$i){

echo"$msgiteration$i\n";

yield;

}

}



functiontask(){

echoTimes(''foo'',10);//printfootentimes

echo"---\n";

echoTimes(''bar'',5);//printbarfivetimes

yield;//forceittobeacoroutine

}



$scheduler=newScheduler;

$scheduler->newTask(task());

$scheduler->run();

Thiscodetriestoputtherecurring“outputntimes”codeintoaseparatecoroutineandtheninvokeitfromthemaintask.Butthiswon’twork.Asmentionedattheverybeginningofthisarticlecallingagenerator(orcoroutine)willnotactuallydoanything,itwillonlyreturnanobject.Thisalsohappensintheabovecase.TheechoTimescallswon’tdoanythingthanreturnan(unused)coroutineobject.



Inordertostillallowthisweneedtowriteasmallwrapperaroundthebarecoroutines.I’llcallthisa“stackedcoroutine”becauseitwillmanageastackofnestedcoroutinecalls.Itwillbepossibletocallsub-coroutinesbyyieldingthem:



$retval=(yieldsomeCoroutine($foo,$bar));

Thesubcoroutineswillalsobeabletoreturnavalue,againbyusingyield:



yieldretval("I''mareturnvalue!");

Theretvalfunctiondoesnothingmorethanreturningawrapperaroundthevaluewhichwillsignalthatit’sareturnvalue:



classCoroutineReturnValue{

protected$value;



publicfunction__construct($value){

$this->value=$value;

}



publicfunctiongetValue(){

return$this->value;

}

}



functionretval($value){

returnnewCoroutineReturnValue($value);

}

Inordertoturnacoroutineintoastackedcoroutine(whichsupportssubcalls)we’llhavetowriteanotherfunction(whichisobviouslyyet-another-coroutine):



functionstackedCoroutine(Generator$gen){

$stack=newSplStack;



for(;;){

$value=$gen->current();



if($valueinstanceofGenerator){

$stack->push($gen);

$gen=$value;

continue;

}



$isReturnValue=$valueinstanceofCoroutineReturnValue;

if(!$gen->valid()||$isReturnValue){

if($stack->isEmpty()){

return;

}



$gen=$stack->pop();

$gen->send($isReturnValue?$value->getValue():NULL);

continue;

}



$gen->send(yield$gen->key()=>$value);

}

}

Thisfunctionactsasasimpleproxybetweenthecallerandthecurrentlyrunningsubcoroutine.Thisiswww.hunanwang.nethandledinthe$gen->send(yield$gen->key()=>$value);line.Additionallyitcheckswhetherareturnvalueisagenerator,inwhichcaseitwillstartrunningitandpushesthepreviouscoroutineonthestack.OnceitgetsaCoroutineReturnValueitwillpopthestackagainandcontinueexecutingthepreviouscoroutine.



Inordertomakethestackedcoroutinesusableintasksthe$this->coroutine=$coroutine;lineintheTaskconstructorneedstobereplacedwith$this->coroutine=stackedCoroutine($coroutine);.



Nowwecanimprovethewebserverexamplefromaboveabitbygroupingthewait+read(andwait+writeandwait+accept)actionsintofunctions.TogrouptherelatedfunctionalityI’lluseaclass:



classCoSocket{

protected$socket;



publicfunction__construct($socket){

$this->socket=$socket;

}



publicfunctionaccept(){

yieldwaitForRead($this->socket);

yieldretval(newCoSocket(stream_socket_accept($this->socket,0)));

}



publicfunctionread($size){

yieldwaitForRead($this->socket);

yieldretval(fread($this->socket,$size));

}



publicfunctionwrite($string){

yieldwaitForWrite($this->socket);

fwrite($this->socket,$string);

}



publicfunctionclose(){

@fclose($this->socket);

}

}

Nowtheservercanberewrittenabitcleaner:



functionserver($port){

echo"Startingserveratport$port...\n";



$socket=@stream_socket_server("tcp://localhost:$port",$errNo,$errStr);

if(!$socket)thrownewException($errStr,$errNo);



stream_set_blocking($socket,0);



$socket=newCoSocket($socket);

while(true){

yieldnewTask(

handleClient(yield$socket->accept())

);

}

}



functionhandleClient($socket){

$data=(yield$socket->read(8192));



$msg="Receivedfollowingrequest:\n\n$data";

$msgLength=strlen($msg);



$response=<<
HTTP/1.1200OK\r

Content-Type:text/plain\r

Content-Length:$msgLength\r

Connection:close\r

\r

$msg

RES;



yield$socket->write($response);

yield$socket->close();

}

Errorhandling

Asagoodprogrammeryouobviouslynoticedthattheaboveexamplesalllackerrorhandling.Prettymucheverysocketoperationisfallibleandcanproduceerrors.Iobviouslydidthisbecauseerrorhandlingisreallytedious(especiallyforsockets!)andwouldeasilyblowupthecodesizebyafewfactors.



ButstillI’dliketocoverhowerrorhandlingforcoroutinesworksingeneral:Coroutinesprovidetheabilitytothrowexceptionsinsidethemusingthethrow()method.AsofthiswritingthismethoddoesnotyetexistinPHP’simplementation,butIwillcommititlatertoday.



Thethrow()methodtakesanexceptionandthrowsitatthecurrentsuspensionpointinthecoroutine.Considerthiscode:



functiongen(){

echo"Foo\n";

try{

yield;

}catch(Exception$e){

echo"Exception:{$e->getMessage()}\n";

}

echo"Bar\n";

}



$gen=gen();

$gen->rewind();//echos"Foo"

$gen->throw(newException(''Test''));//echos"Exception:Test"

//and"Bar"

Thisisreallyawesomeforourpurposes,becausewecanmakesystemwww.visa158.comcallsandsubcoroutinecallsthrowexceptions.ForthesystemcallstheScheduler::run()methodneedsasmalladjustment:



if($retvalinstanceofSystemCall){

try{

$retval($task,$this);

}catch(Exception$e){

$task->setException($e);

$this->schedule($task);

}

continue;

}

AndtheTaskclassneedstohandlethrowcallstoo:



classTask{

//...

protected$exception=null;



publicfunctionsetException($exception){

$this->exception=$exception;

}



publicfunctionrun(){

if($this->beforeFirstYield){

$this->beforeFirstYield=false;

return$this->coroutine->current();

}elseif($this->exception){

$retval=$this->coroutine->throw($this->exception);

$this->exception=null;

return$retval;

}else{

$retval=$this->coroutine->send($this->sendValue);

$this->sendValue=null;

return$retval;

}

}



//...

}

Nowwecanstartthrowingexceptionsfromsystemcalls!E.g.forthekillTaskcall,letsthrowanexceptionifthepassedtaskIDisinvalid:



functionkillTask($tid){

returnnewSystemCall(

function(Task$task,Scheduler$scheduler)use($tid){

if($scheduler->killTask($tid)){

$scheduler->schedule($task);

}else{

thrownewInvalidArgumentException(''InvalidtaskID!'');

}

}

);

}

Tryitout:



functiontask(){

try{

yieldkillTask(500);

}catch(Exception$e){

echo''Triedtokilltask500butfailed:'',$e->getMessage(),"\n";

}

}

Sadlythiswon’tworkproperlyyet,becausethestackedCoroutinefunctiondoesn’thandletheexceptioncorrectly.Tofixitthefunctionneedssomemodifications:



functionstackedCoroutine(Generator$gen){

$stack=newSplStack;

$exception=null;



for(;;){

try{

if($exception){

$gen->throw($exception);

$exception=null;

continue;

}



$value=$gen->current();



if($valueinstanceofGenerator){

$stack->push($gen);

$gen=$value;

continue;

}



$isReturnValue=$valueinstanceofCoroutineReturnValue;

if(!$gen->valid()||$isReturnValue){

if($stack->isEmpty()){

return;

}



$gen=$stack->pop();

$gen->send($isReturnValue?$value->getValue():NULL);

continue;

}



try{

$sendValue=(yield$gen->key()=>$value);

}catch(Exception$e){

$gen->throw($e);

continue;

}



$gen->send($sendValue);

}catch(Exception$e){

if($stack->isEmpty()){

throw$e;

}



$gen=$stack->pop();

$exception=$e;

}

}

}

Wrappingup

Inthisarticlewebuiltataskschedulerusingcooperativemultitasking,includingtheabilitytoperform“systemcalls”,doingnon-blockingIOoperationsandhandlingerrors.Thereallycoolthingaboutallthisisthattheresultingcodeforthetaskslookstotallysynchronous,eventhoughitisperformingalotofasynchronousoperations.Ifyouwanttoreaddatafromasocketyoudon’thavetopasssomecallbackorregisteraneventlistener.Insteadyouwriteyield$socket->read().Whichisbasicallywhatyouwouldnormallydotoo,justwithayieldinfrontofit.



WhenIfirstheardaboutallthisIfoundthisconcepttotallyawesomeandthat’swhatmotivatedmetoimplementitinPHP.AtthesametimeIfindcoroutinesreallyscary.ThereisathinlinebetweenawesomecodeandatotalmessandIthinkcoroutinessitexactlyonthatline.It’shardformetosaywhetherwritingasynccodeinthewayoutlinedaboveisreallybeneficial.



Inanycase,Ithinkit’saninterestingtopicandIhopeyoufounditinterestingtoo.Commentswelcome:)



Ifyoulikedthisarticle,youmaywanttobrowsemyotherarticlesorfollowmeonTwitter.

blogcommentspoweredbyDisqus

献花(0)
+1
(本文系爱就请温柔...首藏)