配色: 字号:
gstreamer的collectpad源码分析
2016-11-08 | 阅:  转:  |  分享 
  
gstreamer的collectpad源码分析

1.背景:

gstreamer的collectpad是一类特殊的pad,这类pad工作于收集模式,用于管理控制若干个pad组成的pad集合的数据同步处理。大部分的合成器(muxer)均使用collectpad来收集音视频数据,并根据可重载的收集条件判断函数对不同pad之间的数据进行处理(或同步)。

由于collectpad中大部分处理函数均可重载(set_func),因此本文只讨论默认的处理函数。

2.默认流程:

collectpad的简单流程如下图:



不同的pad工作与不同的线程中,当某一个pad有数据到来时,会对所有pad进行判断,看看是否可以满足收集条件,如果满足收集条件就向对应的element推送数据。如果不满足收集条件,就会将该线程挂起,等待其他线程的数据。

当某个pad处于挂起时,其他pad收到数据后,一样会对收集条件进行判断,如果满足条件,会将所有pad的数据推送至element,同时广播条件变量,唤醒所有挂起中的其他pad(线程)。

简单的函数调用关系如下:



3.数据结构:

数据结构如下:一个_GstCollectPads中维护了一个_GstCollectData的链表,每个pad对应一个_GstCollectData,其中记录了pad中的数据的时间戳,buffer,已经对应pad的状态(如锁、等待等标志位),GstCollectPadsPrivate中则记录了collectpad中注册的各种事件回调函数,这里的回调函数都有接口可以进行重载。此外,GstCollectPadsPrivate还维护了线程间同步用的锁和条件变量。

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/

GstCollectPads:

@data:(element-typeGstBase.CollectData):#GListof#GstCollectDatamanaged

bythis#GstCollectPads.



Collectpadsobject.

/

struct_GstCollectPads{

/基类。/

GstObjectobject;



///withLOCKand/orSTREAM_LOCK/

/所有PAD的集合。/

/

GstCollectData:

@collect:owner#GstCollectPads

@pad:#GstPadmanagedbythisdata

@buffer:currentlyqueuedbuffer.

@pos:positioninthebuffer

@segment:lastsegmentreceived.

@dts:thesignedversionoftheDTSconvertedtorunningtime.Toaccess

thismemeber,use%GST_COLLECT_PADS_DTSmacro.(Since1.6)



Structureusedbythecollect_pads.

struct_GstCollectData

{

/withSTREAM_LOCKof@collect/

/指向回collectpad。/

GstCollectPadscollect;

GstPadpad;

GstBufferbuffer;

guintpos;

GstSegmentsegment;



//

/state:bitfieldforeasierextension;

eos,flushing,new_segment,waiting/

GstCollectPadsStateFlagsstate;



GstCollectDataPrivatepriv;



union{

struct{

//

gint64dts;

//

}abi;

gpointer_gst_reserved[GST_PADDING];

}ABI;

};

/

GSListdata;/listofCollectDataitems/



//

GRecMutexstream_lock;/usedtoserializecollectionamongseveralstreams/



GstCollectPadsPrivatepriv;



gpointer_gst_reserved[GST_PADDING];

};

4.代码分析:

4.1主入口函数:

主入口函数gst_collect_pads_chain,不同pad工作于不同线程中。代码分析如下:

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/Foreachbufferwereceivewecheckifourcollectedconditionisreached

andifsowecallthecollectedfunction.Whenthisisdonewecheckif

datahasbeenunqueued.Ifdataisstillqueuedwewaitholdingthestream

locktomakesurenoEOSeventcanhappenwhilewearereadytobe

collected

/

staticGstFlowReturn

gst_collect_pads_chain(GstPadpad,GstObjectparent,GstBufferbuffer)

{

GstCollectDatadata;

GstCollectPadspads;

GstFlowReturnret;

GstBufferbuffer_p;

guint32cookie;



GST_DEBUG("Gotbufferforpad%s:%s",GST_DEBUG_PAD_NAME(pad));



/somemagictogetthemanagingcollect_pads/

GST_OBJECT_LOCK(pad);

data=(GstCollectData)gst_pad_get_element_private(pad);

if(G_UNLIKELY(data==NULL))

gotono_data;

ref_data(data);

GST_OBJECT_UNLOCK(pad);



pads=data->collect;



GST_COLLECT_PADS_STREAM_LOCK(pads);

/状态判断。/

/ifnotstarted,bailout/

if(G_UNLIKELY(!pads->priv->started))

gotonot_started;

/checkifthispadisflushing/

if(G_UNLIKELY(GST_COLLECT_PADS_STATE_IS_SET(data,

GST_COLLECT_PADS_STATE_FLUSHING)))

gotoflushing;

/padwasEOS,wecanrefusethisdata/

if(G_UNLIKELY(GST_COLLECT_PADS_STATE_IS_SET(data,

GST_COLLECT_PADS_STATE_EOS)))

gotoeos;



/seeifweneedtoclip/

/数据前处理。/

if(pads->priv->clip_func){

GstBufferoutbuf=NULL;

ret=

pads->priv->clip_func(pads,data,buffer,&outbuf,

pads->priv->clip_user_data);

buffer=outbuf;



if(G_UNLIKELY(outbuf==NULL))

gotoclipped;



if(G_UNLIKELY(ret==GST_FLOW_EOS))

gotoeos;

elseif(G_UNLIKELY(ret!=GST_FLOW_OK))

gotoerror;

}



GST_DEBUG_OBJECT(pads,"Queuingbuffer%pforpad%s:%s",buffer,

GST_DEBUG_PAD_NAME(pad));



/Onemorepadhasdataqueued/

//如果当前collectpad处于WAITING状态会将queuedpads增加

if(GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_WAITING))

pads->priv->queuedpads++;

buffer_p=&data->buffer;

gst_buffer_replace(buffer_p,buffer);



/updatesegmentlastpositionifinTIME/

/更新当前pad上对应的时间信息,后续用于重新计算等待状态需要用到。/

if(G_LIKELY(data->segment.format==GST_FORMAT_TIME)){

GstClockTimetimestamp;



timestamp=GST_BUFFER_DTS_OR_PTS(buffer);



if(GST_CLOCK_TIME_IS_VALID(timestamp))

data->segment.position=timestamp;

}



/Whilewehavedataqueuedonthispadtrytocollectstuff/

do{

/Checkifourcollectedconditionismatchedandcallthecollected

functionifitis/

/主要处理函数,判断收集条件是否满足,后续分析。/

ret=gst_collect_pads_check_collected(pads);

/whenanerroroccurs,wewanttoreportthisbacktothecallerASAP

withouthavingtoblockifthebufferwasnotpopped/

/数据流处理异常,进入异常处理分支。/

if(G_UNLIKELY(ret!=GST_FLOW_OK))

gotoerror;



/datawasconsumed,wecanexitandacceptnewdata/

/当buffer在check_collected函数中被消费,会在其中减少引用次数,释放buffer。

数据被处理后退出循环,等待下一次buffer到来调用chain函数。/

if(data->buffer==NULL)

break;



/数据未被处理,未满足数据收集条件,本pad对应线程将进行唤醒等待。/

/Havingthe_INITheremeanswedon''tcareaboutanybroadcastuptohere

(mostofwhichoccurwithSTREAM_LOCKheld,socouldnothavehappened

anyway).Wedocareaboute.g.aremoveinitiatedbroadcastasofthis

point.Puttingitherealsomakesthisthreadignoresanyevtitraised

itself(asisausualWAITsemantic).

/

GST_COLLECT_PADS_EVT_INIT(cookie);



/padcouldberemovedandre-added/

unref_data(data);

GST_OBJECT_LOCK(pad);

if(G_UNLIKELY((data=gst_pad_get_element_private(pad))==NULL))

gotopad_removed;

ref_data(data);

GST_OBJECT_UNLOCK(pad);



GST_DEBUG_OBJECT(pads,"Pad%s:%shasabufferqueued,waiting",

GST_DEBUG_PAD_NAME(pad));



/waittobecollected,thismusthappenfromanotherthreadtriggered

bythe_chainfunctionofanotherpad.Wereleasethelocksowe

cangetstoppedorflushedaswell.WecanhowevernotgetEOS

becausewestillholdtheSTREAM_LOCK.

/

/等待条件变量被唤醒。/

GST_COLLECT_PADS_STREAM_UNLOCK(pads);

GST_COLLECT_PADS_EVT_WAIT(pads,cookie);

GST_COLLECT_PADS_STREAM_LOCK(pads);



GST_DEBUG_OBJECT(pads,"Pad%s:%sresuming",GST_DEBUG_PAD_NAME(pad));



/唤醒后的状态判断。/

/afterasignal,wecouldbestopped/

if(G_UNLIKELY(!pads->priv->started))

gotonot_started;

/checkifthispadisflushing/

if(G_UNLIKELY(GST_COLLECT_PADS_STATE_IS_SET(data,

GST_COLLECT_PADS_STATE_FLUSHING)))

gotoflushing;

}

while(data->buffer!=NULL);



unlock_done:

GST_COLLECT_PADS_STREAM_UNLOCK(pads);

/dataisdefinitelyNULLifpad_removedgotowasrun./

if(data)

unref_data(data);

if(buffer)

gst_buffer_unref(buffer);

returnret;



/异常状态处理。/

pad_removed:

{

GST_WARNING("%sgotremovedfromcollectpads",GST_OBJECT_NAME(pad));

GST_OBJECT_UNLOCK(pad);

ret=GST_FLOW_NOT_LINKED;

gotounlock_done;

}

/ERRORS/

no_data:

{

GST_DEBUG("%sgotremovedfromcollectpads",GST_OBJECT_NAME(pad));

GST_OBJECT_UNLOCK(pad);

gst_buffer_unref(buffer);

returnGST_FLOW_NOT_LINKED;

}

not_started:

{

GST_DEBUG("notstarted");

gst_collect_pads_clear(pads,data);

ret=GST_FLOW_FLUSHING;

gotounlock_done;

}

flushing:

{

GST_DEBUG("pad%s:%sisflushing",GST_DEBUG_PAD_NAME(pad));

gst_collect_pads_clear(pads,data);

ret=GST_FLOW_FLUSHING;

gotounlock_done;

}

eos:

{

/weshouldnotpostanerrorforthis,justinformupstreamthat

wedon''texpectanythinganymore/

GST_DEBUG("pad%s:%siseos",GST_DEBUG_PAD_NAME(pad));

ret=GST_FLOW_EOS;

gotounlock_done;

}

clipped:

{

GST_DEBUG("clippedbufferonpad%s:%s",GST_DEBUG_PAD_NAME(pad));

ret=GST_FLOW_OK;

gotounlock_done;

}

error:

{

/weprinttheerror,theelementshouldpostareasonableerror

messageforfatalerrors/

GST_DEBUG("collectfailed,reason%d(%s)",ret,gst_flow_get_name(ret));

gst_collect_pads_clear(pads,data);

gotounlock_done;

}

}

4.2框架上的收集条件判断

在check函数,首先对collectpads上面的pad状态进行检查,只有当有数据的pads和总的pads数满足一定条件时候,才会执行第二重的收集条件判断。函数为gst_collect_pads_check_collected,代码分析如下:

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

staticGstFlowReturn

gst_collect_pads_check_collected(GstCollectPadspads)

{

GstFlowReturnflow_ret=GST_FLOW_OK;

GstCollectPadsFunctionfunc;

gpointeruser_data;



g_return_val_if_fail(GST_IS_COLLECT_PADS(pads),GST_FLOW_ERROR);



/获取回调数据。/

GST_OBJECT_LOCK(pads);

func=pads->priv->func;

user_data=pads->priv->user_data;

GST_OBJECT_UNLOCK(pads);



g_return_val_if_fail(pads->priv->func!=NULL,GST_FLOW_NOT_SUPPORTED);



/checkfornewpads,updatestatsetc../

/主要是对等待唤醒的pad的cookie进行校验。/

gst_collect_pads_check_pads(pads);



/所有pad都是EOS状态。直接处理剩余的所有数据。/

if(G_UNLIKELY(pads->priv->eospads==pads->priv->numpads)){

/IfallourpadsareEOSjustcollectoncetolettheelement

doitsfinalEOShandling./

GST_DEBUG_OBJECT(pads,"Allactivepads(%d)areEOS,calling%s",

pads->priv->numpads,GST_DEBUG_FUNCPTR_NAME(func));



if(G_UNLIKELY(g_atomic_int_compare_and_exchange(&pads->priv->seeking,

TRUE,FALSE))){

GST_INFO_OBJECT(pads,"finishedseeking");

}

do{

flow_ret=func(pads,user_data);

}while(flow_ret==GST_FLOW_OK);

}else{

/有pad处于非EOS状态。/

gbooleancollected=FALSE;



/Wecallthecollectedfunctionaslongasourconditionmatches./

/只有满足(有数据的有效pad数+无效pad数>=总的pad数)时,才可以进入下一步的

条件判断,这个判断是框架级别的判断,总是存在,其余重载的判断函数(func)都在这个循环中处理。

如果函数不执行,则buffer一定不会被消费,在外层会走入线程挂起等待唤醒的流程。/

while(((pads->priv->queuedpads+pads->priv->eospads)>=

pads->priv->numpads)){

GST_DEBUG_OBJECT(pads,

"Allactivepads(%d+%d>=%d)havedata,""calling%s",

pads->priv->queuedpads,pads->priv->eospads,pads->priv->numpads,

GST_DEBUG_FUNCPTR_NAME(func));



if(G_UNLIKELY(g_atomic_int_compare_and_exchange(&pads->priv->seeking,

TRUE,FALSE))){

GST_INFO_OBJECT(pads,"finishedseeking");

}



/具体数据的收集条件判断。/

flow_ret=func(pads,user_data);

collected=TRUE;



/数据处理异常或者已经没有有数据的pad了,中断循环。/

/breakonerror/

if(flow_ret!=GST_FLOW_OK)

break;

/Don''tkeeploopingaftertellingtheelementEOSorflushing/

if(pads->priv->queuedpads==0)

break;

}

if(!collected)

GST_DEBUG_OBJECT(pads,"Notallactivepads(%d)havedata,continuing",

pads->priv->numpads);

}

returnflow_ret;

}

4.3默认的第二重收集条件判断

第二重收集条件的函数是可以进行重载的,可以使用gst_collect_pads_set_function进行设置,这里只分析默认的函数gst_collect_pads_default_collected。

代码分析如下:

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/

Defaultcollectcallbacktriggeredwhen#GstCollectPadsgatheredalldata.



CalledwithSTREAM_LOCK.

/

staticGstFlowReturn

gst_collect_pads_default_collected(GstCollectPadspads,gpointeruser_data)

{

GstCollectDatabest=NULL;

GstBufferbuffer;

GstFlowReturnret=GST_FLOW_OK;

GstCollectPadsBufferFunctionfunc;

gpointerbuffer_user_data;



g_return_val_if_fail(GST_IS_COLLECT_PADS(pads),GST_FLOW_ERROR);



/获取回调数据。/

GST_OBJECT_LOCK(pads);

func=pads->priv->buffer_func;

buffer_user_data=pads->priv->buffer_user_data;

GST_OBJECT_UNLOCK(pads);



g_return_val_if_fail(func!=NULL,GST_FLOW_NOT_SUPPORTED);



/Findtheoldestpadatallcost/

/寻找最合适的pad,并计算最早的数据和时间戳。

最后返回gst_collect_pads_recalculate_waiting的返回值,

TRUE表示从非等待状态变为等待状态。

在默认场景下,只有使用了set_wait为FALSE时候才会标记为non-waiting状态。

因此如果在默认框架下主动设置了non-waiting状态,需要留意时间比较函数。

否则这里会进入一个

FLOW_OK->数据没有POP->pad_num没有变化->gst_collect_pads_check_collected主循环中死循环的问题。/

if(gst_collect_pads_recalculate_full(pads)){

/waitingwasswitchedon,

sogiveanotherthreadachancetodeliverapossibly

olderbuffer;don''tchargeonyetwiththecurrentoldest/

ret=GST_FLOW_OK;

gotodone;

}



best=pads->priv->earliest_data;



/NodatacollectedmeansEOS./

/在waiting状态下,但是没有最新的数据包,因此认为这个pad已经进入EOS状态了。无法接收数据。

注意,这里设置non-waiting以后并修改了时间比较函数后,其他地方调用默认函数,也会导致一个问题:

由于non-waiting增加了queuedpad,因此如果总的有两个pad,且两个pad都设置了non-waiting后,在函数

gst_collect_pads_check_collected中条件判断总是成立,且queuedpad在non-waiting状态下无法自减,

第一次进入时候就会把所有的pad的数据直接处理,及时处理完所有数据后,依旧走到这里进行判断,

但是这时候buffer已经为空,导致collectpad认为这个pad的数据已经进入EOS状态,处理异常。/

if(G_UNLIKELY(best==NULL)){

ret=func(pads,best,NULL,buffer_user_data);

if(ret==GST_FLOW_OK)

ret=GST_FLOW_EOS;

gotodone;

}



/makesurethatthepadwetakeabufferfromiswaiting;

otherwisepoppingabufferwillseemnottohavehappened

andcollectpadscangetintoabusyloop/

gst_collect_pads_set_waiting(pads,best,TRUE);



/Sendbuffer/

/使用pop弹出buffer,并将buffer发送给buffer_func进行处理。/

buffer=gst_collect_pads_pop(pads,best);

ret=func(pads,best,buffer,buffer_user_data);



/maybenon-waitingwasforcedtowaitingabovedueto

newsegmenteventscomingtoospwww.sm136.comarsely,

sore-checktorestorestatetoavoidhanging/waiting/

gst_collect_pads_recalculate_full(pads);



done:

returnret;

}

注意,这里如果对某些函数进行重载或者设置了非等待状态,有两个潜在的异常流程。

4.3.1异常流程1:

当使用默认的时间比较函数,且设置了非等待状态的pad有数据到来时,在函数gst_collect_pads_recalculate_waiting,当earliest_data检测到本PAD时,这时候时间戳应该是相等的,但是这时候如果处于非等待状态,无论是否加锁最后都会返回TRUE,这时候gst_collect_pads_default_collected函数中的第一个判断总会直接返回GST_FLOW_OK,但是实际并没有弹出任何buffer,但是gst_collect_pads_check_collected的循环条件并没有改变,导致这个线程会一直在这里循环。如果其他pad没有数据进入,则这个pad会进入死循环。

4.3.2异常流程2:

当所有的pads都设置了non-waiting状态,则在框架的收集条件判断函数gst_collect_pads_check_collected中的pads数量比较循环总是成立,且所有pads数据弹出时都不会减少当前的queuedpad数,因此当有一个buffer弹出后,会持续弹出所有buffer,当buffer为空时,循环条件依旧成立,在处理空buffer时,认为这个pad已经进入了EOS状态,从而导致异常。异常流程如下图:



4.4寻找最优的可用buffer和pad

这个函数流程比较简单,就是遍历collectpads中的所有pad,并和earliest_time进行比较,寻找最早的时间点的buffer。

这里涉及到时间比较的函数,这里的默认时间比较函数比较简单,就是单纯判断时间点的大小,相等返回0,第一个时间点大于第二个返回1,小于返回-1。

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/

gst_collect_pads_find_best_pad:

@pads:thecollectpadstouse

@data:returnsthecollectdataforearliestdata

@time:returnstheearliestavailablebuffertime



Findtheoldest/bestpad,i.e.padholdingtheoldestbufferand

andreturnthecorresponding#GstCollectDataandbuffertime.



ThisfunctionshouldbecalledwithSTREAM_LOCKheld,

suchasinthecallback.

/

staticvoid

gst_collect_pads_find_best_pad(GstCollectPadspads,

GstCollectDatadata,GstClockTimetime)

{

GSListcollected;

GstCollectDatabest=NULL;

GstClockTimebest_time=GST_CLOCK_TIME_NONE;



g_return_if_fail(data!=NULL);

g_return_if_fail(time!=NULL);



/遍历所有pads,对所有pads中的数据与当前的earliest_time进行比较,

寻找时间最靠前的buffer及其对应的pad。/

for(collected=pads->data;collected;collected=g_slist_next(collected)){

GstBufferbwww.shanxiwang.netuffer;

GstCollectDatadata=(GstCollectData)collected->data;

GstClockTimetimestamp;



buffer=gst_collect_pads_peek(pads,data);

/ifwehaveabuffercheckifitisbetterthenthecurrentbestone/

if(buffer!=NULL){

timestamp=GST_BUFFER_DTS_OR_PTS(buffer);

gst_buffer_unref(buffer);

if(best==NULL||pads->priv->compare_func(pads,data,timestamp,

best,best_time,pads->priv->compare_user_data)<0){

best=data;

best_time=timestamp;

}

}

}



/setearliesttime/

data=best;

time=best_time;



GST_DEBUG_OBJECT(pads,"bestpad%s,besttime%"GST_TIME_FORMAT,

best?GST_PAD_NAME(((GstCollectData)best)->pad):"(nil)",

GST_TIME_ARGS(best_time));

}

3.5重新计算等待状态函数

函数gst_collect_pads_recalculate_waiting会根据earliest_time和所有pad上的数据进行比较,计算collectpad是否需要重新进入等待状态,返回TRUE表示从非等待状态进入等待状态。

这里如果设置了non-waiting状态,则要小心4.3中出现的异常。

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/Generaloverview:

-onlypadwithabuffercandetermineearliest_data(andearliest_time)

-onlysegmentinfodetermines(non-)waitingstate

-?perhapsuse_stream_timeforcomparison

(whichmuxersmighthaveuseaswell?)

/



/

Functiontorecalculatethewaitingstateofallpads.



MustbecalledwithSTREAM_LOCK.



Returns%TRUEifapadwassettowaiting

(fromnon-waitingstate).

/

staticgboolean

gst_collect_pads_recalculate_waiting(GstCollectPadspads)

{

GSListcollected;

gbooleanresult=FALSE;



/Ifearliesttimeisnotknown,thereisnothingtodo./

/没有数据可以比较。/

if(pads->priv->earliest_data==NULL)

returnFALSE;



/遍历所有pads。/

for(collected=pads->data;collected;collected=g_slist_next(collected)){

GstCollectDatadata=(GstCollectData)collected->data;

intcmp_res;

GstClockTimecomp_time;



/checkifpadhasasegment/

/检查本pad上对应的时间信息。/

if(data->segment.format==GST_FORMAT_UNDEFINED){

GST_WARNING_OBJECT(pads,

"GstCollectPadshasnotimesegment,assuming0based.");

gst_segment_init(&data->segment,GST_FORMAT_TIME);

GST_COLLECT_PADS_STATE_SET(data,GST_COLLECT_PADS_STATE_NEW_SEGMENT);

}



/checksegmentformat/

if(data->segment.format!=GST_FORMAT_TIME){

GST_ERROR_OBJECT(pads,"GstCollectPadscanhandleonlytimesegments.");

continue;

}



/checkifthewaitingstateshouldbechanged/

/将earliest_time和当前pad上的时间信息进行比较。

当cmp_res为1,表示本pad的时间比earliest_time晚,这时候数据可以消费,不需要等待。

将返回FALSE,在函数gst_collect_pads_default_collected执行buffer_func消费buffer。

否则表示本pad时间比earliest_time早或者相等,如果这时候是在非等待状态,则要设置成等待状态,

同时返回TRUE,并在gst_collect_pads_default_collected不处理buffer,返回GST_FLOW_OK,重新计算best。

这里要注意设置了non-waiting后的第一个包,第一个包的时间有可能是相同的,即0:00==0:00

/

comp_time=data->segment.position;

cmp_res=pads->priv->compare_func(pads,data,comp_time,

pads->priv->earliest_data,pads->priv->earliest_time,

pads->priv->compare_user_data);

if(cmp_res>0)

/stopwaiting/

gst_collect_pads_set_waiting(pads,data,FALSE);

else{

if(!GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_WAITING)){

/startwaiting/

gst_collect_pads_set_waiting(pads,data,TRUE);

result=TRUE;

}

}

}



returnresult;

}

4.6锁和等待状态

collectpad提供了接口gst_collect_pads_set_waiting可以给其他组件设置某个pad为等待或者非等待状态。其设置与锁GST_COLLECT_PADS_STATE_LOCKED标志位有关系。

默认情况下的pad(注意,这里的pad为单独的一个pad,并非整个collectpad,这些状态为单个pad私有,并不是collectpad的属性)均为等待状态,而锁的初始化则根据element调用collect添加pad的函数gst_collect_pads_add_pad的最后一个参数决定。

[cpp]viewplaincopy在CODE上查看代码片派生到我的代码片

/

gst_collect_pads_set_waiting:

@pads:thecollectpads

@data:thedatatouse

@waiting:booleanindicatingwhetherthispadshouldoperate

inwaitingornon-waitingmode



Setsapadtowaitingornon-waitingmode,ifatleastthispad

hasnotbeencreatedwithlockedwaitingstate,

inwhichcasenothinghappens.



Thisfunctionshouldbecalledwith@padsSTREAM_LOCKheld,suchas

inthecallback.



MTsafe.

/

void

gst_collect_pads_set_waiting(GstCollectPadspads,GstCollectDatadata,

gbooleanwaiting)

{

g_return_if_fail(pads!=NULL);

g_return_if_fail(GST_IS_COLLECT_PADS(pads));

g_return_if_fail(data!=NULL);



GST_DEBUG_OBJECT(pads,"Settingpad%stowaiting%d,locked%d",

GST_PAD_NAME(data->pad),waiting,

GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_LOCKED));



/Dosomethingonlyonachangeandifnotlocked/

/修改等待状态标志位需要在没有上锁的情况下处理,

可以通过GST_COLLECT_PADS_STATE_SET(data,GST_COLLECT_PADS_STATE_LOCKED);方式加解锁。

如果设置为非等待,则会把对应的queuedpad自增,当所有pad都处于非等待状态,则框架收集条件总是满足。

可能存在4.3.2的问题。

/

if(!GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_LOCKED)&&

(GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_WAITING)!=

!!waiting)){

/Setwaitingstateforthispad/

if(waiting)

GST_COLLECT_PADS_STATE_SET(data,GST_COLLECT_PADS_STATE_WAITING);

else

GST_COLLECT_PADS_STATE_UNSET(data,GST_COLLECT_PADS_STATE_WAITING);

/Updatenumberofqueuedpadsifneeded/

if(!data->buffer&&

!GST_COLLECT_PADS_STATE_IS_SET(data,GST_COLLECT_PADS_STATE_EOS)){

if(waiting)

pads->priv->queuedpads--;

else

pads->priv->queuedpads++;

}



/signalwaitersbecausesomethingchanged/

GST_COLLECT_PADS_EVT_BROADCAST(pads);

}

}

献花(0)
+1
(本文系网络学习天...首藏)