配色: 字号:
gstreamer的collectpad源码分析
2016-11-08 | 阅:  转:  |  分享 
  
gstreamer的collectpad源码分析

1.背景:

[cpp]?viewplain?copy?

/?

??GstCollectPads:?

??@data:?(element-type?GstBase.CollectData):?#GList?of?#GstCollectData?managed?

????by?this?#GstCollectPads.?

??

??Collectpads?object.?

?/??

struct?_GstCollectPads?{??

??/?基类。??/??

??GstObject??????object;??

??

??//?/?with?LOCK?and/or?STREAM_LOCK?/??

??/?所有PAD的集合。??/??

??/?

?????GstCollectData:?

?????@collect:?owner?#GstCollectPads?

?????@pad:?#GstPad?managed?by?this?data?

?????@buffer:?currently?queued?buffer.?

?????@pos:?position?in?the?buffer?

?????@segment:?last?segment?received.?

?????@dts:?the?signed?version?of?the?DTS?converted?to?running?time.?To?access?

???????????this?memeber,?use?%GST_COLLECT_PADS_DTS?macro.?(Since?1.6)?

?????

?????Structure?used?by?the?collect_pads.?

????struct?_GstCollectData?

????{?

??????/?with?STREAM_LOCK?of?@collect?/??

??????/?指向回collectpad。??/??

??????GstCollectPads????????collect;??

??????GstPad????????????????pad;??

??????GstBuffer?????????????buffer;??

??????guint??????????????????pos;??

??????GstSegment?????????????segment;??

??????

??????//??

??????/?state:?bitfield?for?easier?extension;?

????????eos,?flushing,?new_segment,?waiting?/??

??????GstCollectPadsStateFlags????state;??

??????

??????GstCollectDataPrivate?priv;??

??????

??????union?{??

????????struct?{??

??????????//??

??????????gint64?dts;??

??????????//??

????????}?abi;??

????????gpointer?_gst_reserved[GST_PADDING];??

??????}?ABI;??

????};??

???/??

??GSList????????data;??????????????????/?list?of?CollectData?items?/??

??

??//??

??GRecMutex??????stream_lock;??????????/?used?to?serialize?collection?among?several?streams?/??

??

??GstCollectPadsPrivate?priv;??

??

??gpointer?_gst_reserved[GST_PADDING];??

};??

4.代码分析:

4.1主入口函数:

????主入口函数gst_collect_pads_chain,不同pad工作于不同线程中。代码分析如下:

[cpp]?viewplain?copy?

/?For?each?buffer?we?receive?we?check?if?our?collected?condition?is?reached?

??and?if?so?we?call?the?collected?function.?When?this?is?done?we?check?if?

??data?has?been?unqueued.?If?data?is?still?queued?we?wait?holding?the?stream?

??lock?to?make?sure?no?EOS?event?can?happen?while?we?are?ready?to?be?

??collected??

?/??

static?GstFlowReturn??

gst_collect_pads_chain?(GstPad??pad,?GstObject??parent,?GstBuffer??buffer)??

{??

??GstCollectData?data;??

??GstCollectPads?pads;??

??GstFlowReturn?ret;??

??GstBuffer?buffer_p;??

??guint32?cookie;??

??

??GST_DEBUG?("Got?buffer?for?pad?%s:%s",?GST_DEBUG_PAD_NAME?(pad));??

??

??/?some?magic?to?get?the?managing?collect_pads?/??

??GST_OBJECT_LOCK?(pad);??

??data?=?(GstCollectData?)?gst_pad_get_element_private?(pad);??

??if?(G_UNLIKELY?(data?==?NULL))??

????goto?no_data;??

??ref_data?(data);??

??GST_OBJECT_UNLOCK?(pad);??

??

??pads?=?data->collect;??

??

??GST_COLLECT_PADS_STREAM_LOCK?(pads);??

??/?状态判断。??/??

??/?if?not?started,?bail?out?/??

??if?(G_UNLIKELY?(!pads->priv->started))??

????goto?not_started;??

??/?check?if?this?pad?is?flushing?/??

??if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??

??????????????GST_COLLECT_PADS_STATE_FLUSHING)))??

????goto?flushing;??

??/?pad?was?EOS,?we?can?refuse?this?data?/??

??if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??

??????????????GST_COLLECT_PADS_STATE_EOS)))??

????goto?eos;??

??

??/?see?if?we?need?to?clip?/??

??/?数据前处理。??/??

??if?(pads->priv->clip_func)?{??

????GstBuffer?outbuf?=?NULL;??

????ret?=??

????????pads->priv->clip_func?(pads,?data,?buffer,?&outbuf,??

????????pads->priv->clip_user_data);??

????buffer?=?outbuf;??

??

????if?(G_UNLIKELY?(outbuf?==?NULL))??

??????goto?clipped;??

??

????if?(G_UNLIKELY?(ret?==?GST_FLOW_EOS))??

??????goto?eos;??

????else?if?(G_UNLIKELY?(ret?!=?GST_FLOW_OK))??

??????goto?error;??

??}??

??

??GST_DEBUG_OBJECT?(pads,?"Queuing?buffer?%p?for?pad?%s:%s",?buffer,??

??????GST_DEBUG_PAD_NAME?(pad));??

??

??/?One?more?pad?has?data?queued?/??

??//?如果当前collectpad处于WAITING状态会将queuedpads增加??

??if?(GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING))??

????pads->priv->queuedpads++;??

??buffer_p?=?&data->buffer;??

??gst_buffer_replace?(buffer_p,?buffer);??

??

??/?update?segment?last?position?if?in?TIME?/??

??/?更新当前pad上对应的时间信息,后续用于重新计算等待状态需要用到。??/??

??if?(G_LIKELY?(data->segment.format?==?GST_FORMAT_TIME))?{??

????GstClockTime?timestamp;??

??

????timestamp?=?GST_BUFFER_DTS_OR_PTS?(buffer);??

??

????if?(GST_CLOCK_TIME_IS_VALID?(timestamp))??

??????data->segment.position?=?timestamp;??

??}??

??

??/?While?we?have?data?queued?on?this?pad?try?to?collect?stuff?/??

??do?{??

????/?Check?if?our?collected?condition?is?matched?and?call?the?collected?

??????function?if?it?is?/??

????/?主要处理函数,判断收集条件是否满足,后续分析。??/??

????ret?=?gst_collect_pads_check_collected?(pads);??

????/?when?an?error?occurs,?we?want?to?report?this?back?to?the?caller?ASAP?

??????without?having?to?block?if?the?buffer?was?not?popped?/??

????/?数据流处理异常,进入异常处理分支。??/??

????if?(G_UNLIKELY?(ret?!=?GST_FLOW_OK))??

??????goto?error;??

??

????/?data?was?consumed,?we?can?exit?and?accept?new?data?/??

????/?当buffer在check_collected函数中被消费,会在其中减少引用次数,释放buffer。?

??????数据被处理后退出循环,等待下一次buffer到来调用chain函数。??/??

????if?(data->buffer?==?NULL)??

??????break;??

??

????/?数据未被处理,未满足数据收集条件,本pad对应线程将进行唤醒等待。??/??

????/?Having?the?_INIT?here?means?we?don''t?care?about?any?broadcast?up?to?here?

??????(most?of?which?occur?with?STREAM_LOCK?held,?so?could?not?have?happened?

??????anyway).??We?do?care?about?e.g.?a?remove?initiated?broadcast?as?of?this?

??????point.??Putting?it?here?also?makes?this?thread?ignores?any?evt?it?raised?

??????itself?(as?is?a?usual?WAIT?semantic).?

?????/??

????GST_COLLECT_PADS_EVT_INIT?(cookie);??

??

????/?pad?could?be?removed?and?re-added?/??

????unref_data?(data);??

????GST_OBJECT_LOCK?(pad);??

????if?(G_UNLIKELY?((data?=?gst_pad_get_element_private?(pad))?==?NULL))??

??????goto?pad_removed;??

????ref_data?(data);??

????GST_OBJECT_UNLOCK?(pad);??

??

????GST_DEBUG_OBJECT?(pads,?"Pad?%s:%s?has?a?buffer?queued,?waiting",??

????????GST_DEBUG_PAD_NAME?(pad));??

??

????/?wait?to?be?collected,?this?must?happen?from?another?thread?triggered?

??????by?the?_chain?function?of?another?pad.?We?release?the?lock?so?we?

??????can?get?stopped?or?flushed?aswww.wang027.com?well.?We?can?however?not?get?EOS?

??????because?we?still?hold?the?STREAM_LOCK.?

?????/??

????/?等待条件变量被唤醒。??/??

????GST_COLLECT_PADS_STREAM_UNLOCK?(pads);??

????GST_COLLECT_PADS_EVT_WAIT?(pads,?cookie);??

????GST_COLLECT_PADS_STREAM_LOCK?(pads);??

??

????GST_DEBUG_OBJECT?(pads,?"Pad?%s:%s?resuming",?GST_DEBUG_PAD_NAME?(pad));??

??

????/?唤醒后的状态判断。??/??

????/?after?a?signal,?we?could?be?stopped?/??

????if?(G_UNLIKELY?(!pads->priv->started))??

??????goto?not_started;??

????/?check?if?this?pad?is?flushing?/??

????if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??

????????????????GST_COLLECT_PADS_STATE_FLUSHING)))??

??????goto?flushing;??

??}??

??while?(data->buffer?!=?NULL);??

??

unlock_done:??

??GST_COLLECT_PADS_STREAM_UNLOCK?(pads);??

??/?data?is?definitely?NULL?if?pad_removed?goto?was?run.?/??

??if?(data)??

????unref_data?(data);??

??if?(buffer)??

????gst_buffer_unref?(buffer);??

??return?ret;??

??

/?异常状态处理。??/??

pad_removed:??

??{??

????GST_WARNING?("%s?got?removed?from?collectpads",?GST_OBJECT_NAME?(pad));??

????GST_OBJECT_UNLOCK?(pad);??

????ret?=?GST_FLOW_NOT_LINKED;??

????goto?unlock_done;??

??}??

??/?ERRORS?/??

no_data:??

??{??

????GST_DEBUG?("%s?got?removed?from?collectpads",?GST_OBJECT_NAME?(pad));??

????GST_OBJECT_UNLOCK?(pad);??

????gst_buffer_unref?(buffer);??

????return?GST_FLOW_NOT_LINKED;??

??}??

not_started:??

??{??

????GST_DEBUG?("not?started");??

????gst_collect_pads_clear?(pads,?data);??

????ret?=?GST_FLOW_FLUSHING;??

????goto?unlock_done;??

??}??

flushing:??

??{??

????GST_DEBUG?("pad?%s:%s?is?flushing",?GST_DEBUG_PAD_NAME?(pad));??

????gst_collect_pads_clear?(pads,?data);??

????ret?=?GST_FLOW_FLUSHING;??

????goto?unlock_done;??

??}??

eos:??

??{??

????/?we?should?not?post?an?error?for?this,?just?inform?upstream?that?

??????we?don''t?expect?anything?anymore?/??

????GST_DEBUG?("pad?%s:%s?is?eos",?GST_DEBUG_PAD_NAME?(pad));??

????ret?=?GST_FLOW_EOS;??

????goto?unlock_done;??

??}??

clipped:??

??{??

????GST_DEBUG?("clipped?buffer?on?pad?%s:%s",?GST_DEBUG_PAD_NAME?(pad));??

????ret?=?GST_FLOW_OK;??

????goto?unlock_done;??

??}??

error:??

??{??

????/?we?print?the?error,?the?element?should?post?a?reasonable?error?

??????message?for?fatal?errors?/??

????GST_DEBUG?("collect?failed,?reason?%d?(%s)",?ret,?gst_flow_get_name?(ret));??

????gst_collect_pads_clear?(pads,?data);??

????goto?unlock_done;??

??}??

}??

4.2框架上的收集条件判断

????在check函数,首先对collectpads上面的pad状态进行检查,只有当有数据的pads和总的pads数满足一定条件时候,才会执行第二重的收集条件判断。函数为gst_collect_pads_check_collected,代码分析如下:

[cpp]?viewplain?copy?

static?GstFlowReturn??

gst_collect_pads_check_collected?(GstCollectPads??pads)??

{??

??GstFlowReturn?flow_ret?=?GST_FLOW_OK;??

??GstCollectPadsFunction?func;??

??gpointer?user_data;??

??

??g_return_val_if_fail?(GST_IS_COLLECT_PADS?(pads),?GST_FLOW_ERROR);??

??

??/?获取回调数据。??/??

??GST_OBJECT_LOCK?(pads);??

??func?=?pads->priv->func;??

??user_data?=?pads->priv->user_data;??

??GST_OBJECT_UNLOCK?(pads);??

??

??g_return_val_if_fail?(pads->priv->func?!=?NULL,?GST_FLOW_NOT_SUPPORTED);??

??

??/?check?for?new?pads,?update?stats?etc..?/??

??/?主要是对等待唤醒的pad的cookie进行校验。??/??

??gst_collect_pads_check_pads?(pads);??

??

??/?所有pad都是EOS状态。直接处理剩余的所有数据。??/??

??if?(G_UNLIKELY?(pads->priv->eospads?==?pads->priv->numpads))?{??

????/?If?all?our?pads?are?EOS?just?collect?once?to?let?the?element?

??????do?its?final?EOS?handling.?/??

????GST_DEBUG_OBJECT?(pads,?"All?active?pads?(%d)?are?EOS,?calling?%s",??

????????pads->priv->numpads,?GST_DEBUG_FUNCPTR_NAME?(func));??

??

????if?(G_UNLIKELY?(g_atomic_int_compare_and_exchange?(&pads->priv->seeking,??

????????????????TRUE,?FALSE)))?{??

??????GST_INFO_OBJECT?(pads,?"finished?seeking");??

????}??

????do?{??

??????flow_ret?=?func?(pads,?user_data);??

????}?while?(flow_ret?==?GST_FLOW_OK);??

??}?else?{??

????/?有pad处于非EOS状态。??/??

????gboolean?collected?=?FALSE;??

??

????/?We?call?the?collected?function?as?long?as?our?condition?matches.?/??

????/?只有满足(有数据的有效pad数?+?无效pad数?>=?总的pad数)时,才可以进入下一步的?

??????条件判断,这个判断是框架级别的判断,总是存在,其余重载的判断函数(func)都在这个循环中处理。?

??????如果函数不执行,则buffer一定不会被消费,在外层会走入线程挂起等待唤醒的流程。??/??

????while?(((pads->priv->queuedpads?+?pads->priv->eospads)?>=??

????????????pads->priv->numpads))?{??

??????GST_DEBUG_OBJECT?(pads,??

??????????"All?active?pads?(%d?+?%d?>=?%d)?have?data,?"?"calling?%s",??

??????????pads->priv->queuedpads,?pads->priv->eospads,?pads->priv->numpads,??

??????????GST_DEBUG_FUNCPTR_NAME?(func));??

??

??????if?(G_UNLIKELY?(g_atomic_int_compare_and_exchange?(&pads->priv->seeking,??

??????????????????TRUE,?FALSE)))?{??

????????GST_INFO_OBJECT?(pads,?"finished?seeking");??

??????}??

????????

??????/?具体数据的收集条件判断。??/??

??????flow_ret?=?func?(pads,?user_data);??

??????collected?=?TRUE;??

??

??????/?数据处理异常或者已经没有有数据的pad了,中断循环。??/??

??????/?break?on?error?/??

??????if?(flow_ret?!=?GST_FLOW_OK)??

????????break;??

??????/?Don''t?keep?looping?after?telling?the?element?EOS?or?flushing?/??

??????if?(pads->priv->queuedpads?==?0)??

????????break;??

????}??

????if?(!collected)??

??????GST_DEBUG_OBJECT?(pads,?"Not?all?active?pads?(%d)?have?data,?continuing",??

??????????pads->priv->numpads);??

??}??

??return?flow_ret;??

}??

4.3默认的第二重收集条件判断

????第二重收集条件的函数是可以进行重载的,可以使用gst_collect_pads_set_function进行设置,这里只分析默认的函数gst_collect_pads_default_collected。

????代码分析如下:

[cpp]?viewplain?copy?

/?

??Default?collect?callback?triggered?when?#GstCollectPads?gathered?all?data.?

??

??Called?with?STREAM_LOCK.?

?/??

static?GstFlowReturn??

gst_collect_pads_default_collected?(GstCollectPads??pads,?gpointer?user_data)??

{??

??GstCollectData?best?=?NULL;??

??GstBuffer?buffer;??

??GstFlowReturn?ret?=?GST_FLOW_OK;??

??GstCollectPadsBufferFunction?func;??

??gpointer?buffer_user_data;??

??

??g_return_val_if_fail?(GST_IS_COLLECT_PADS?(pads),?GST_FLOW_ERROR);??

??

??/?获取回调数据。??/??

??GST_OBJECT_LOCK?(pads);??

??func?=?pads->priv->buffer_func;??

??buffer_user_data?=?pads->priv->buffer_user_data;??

??GST_OBJECT_UNLOCK?(pads);??

??

??g_return_val_if_fail?(func?!=?NULL,?GST_FLOW_NOT_SUPPORTED);??

??

??/?Find?the?oldest?pad?at?all?cost?/??

??/?寻找最合适的pad,并计算最早的数据和时间戳。?

????最后返回gst_collect_pads_recalculate_waiting的返回值,?

????TRUE表示从非等待状态变为等待状态。?

????在默认场景下,只有使用了set_wait为FALSE时候才会标记为non-waiting状态。?

????因此如果在默认框架下主动设置了non-waiting状态,需要留意时间比较函数。?

????否则这里会进入一个?

????FLOW_OK?->?数据没有POP?->?pad_num没有变化?->?gst_collect_pads_check_collected主循环中死循环的问题。??/??

??if?(gst_collect_pads_recalculate_full?(pads))?{??

????/?waiting?was?switched?on,?

??????so?give?another?thread?a?chance?to?deliver?a?possibly?

??????older?buffer;?don''t?charge?on?yet?with?the?current?oldest?/??

????ret?=?GST_FLOW_OK;??

????goto?done;??

??}??

??

??best?=?pads->priv->earliest_data;??

??

??/?No?data?collected?means?EOS.?/??

??/?在waiting状态下,但是没有最新的数据包,因此认为这个pad已经进入EOS状态了。无法接收数据。?

????注意,这里设置non-waiting以后并修改了时间比较函数后,其他地方调用默认函数,也会导致一个问题:?

????由于non-waiting增加了queuedpad,因此如果总的有两个pad,且两个pad都设置了non-waiting后,在函数?

????gst_collect_pads_check_collected中条件判断总是成立,且queuedpad在non-waiting状态下无法自减,?

????第一次进入时候就会把所有的pad的数据直接处理,及时处理完所有数据后,依旧走到这里进行判断,?

????但是这时候buffer已经为空,导致collectpad认为这个pad的数据已经进入EOS状态,处理异常。??/??

??if?(G_UNLIKELY?(best?==?NULL))?{??

????ret?=?func?(pads,?best,?NULL,?buffer_user_data);??

????if?(ret?==?GST_FLOW_OK)??

??????ret?=?GST_FLOW_EOS;??

????goto?done;??

??}??

??

??/?make?sure?that?the?pad?we?take?a?buffer?from?is?waiting;?

????otherwise?popping?a?buffer?will?seem?not?to?have?happened?

????and?collectpads?can?get?into?a?busy?loop?/??

??gst_collect_pads_set_waiting?(pads,?best,?TRUE);??

??

??/?Send?buffer?/??

??/?使用pop弹出buffer,并将buffer发送给buffer_func进行处理。??/??

??buffer?=?gst_collect_pads_pop?(pads,?best);??

??ret?=?func?(pads,?best,?buffer,?buffer_user_data);??

??

??/?maybe?non-waiting?was?forced?to?waiting?above?due?to?

????newsegment?events?coming?too?sparsely,?

????so?re-check?to?restore?state?to?avoid?hanging/waiting?/??

??gst_collect_pads_recalculate_full?(pads);??

??

done:??

??return?ret;??

}??

????注意,这里如果对某些函数进行重载或者设置了非等待状态,有两个潜在的异常流程。

4.3.1异常流程1:

????当使用默认的时间比较函数,且设置了非等待状态的pad有数据到来时,在函数gst_collect_pads_recalculate_waiting,当earliest_data检测到本PAD时,这时候时间戳应该是相等的,但是这时候如果处于非等待状态,无论是否加锁最后都会返回TRUE,这时候gst_collect_pads_default_collected函数中的第一个判断总会直接返回GST_FLOW_OK,但是实际并没有弹出任何buffer,但是gst_collect_pads_check_collected的循环条件并没有改变,导致这个线程会一直在这里循环。如果其他pad没有数据进入,则这个pad会进入死循环。

4.3.2异常流程2:

????当所有的pads都设置了non-waiting状态,则在框架的收集条件判断函数gst_collect_pads_check_collected中的pads数量比较循环总是成立,且所有pads数据弹出时都不会减少当前的queuedpad数,因此当有一个buffer弹出后,会持续弹出所有buffer,当buffer为空时,循环条件依旧成立,在处理空buffer时,认为这个pad已经进入了EOS状态,从而导致异常。异常流程如下图:



4.4寻找最优的可用buffer和pad

????这个函数流程比较简单,就是遍历collectpads中的所有pad,并和earliest_time进行比较,寻找最早的时间点的buffer。

????这里涉及到时间比较的函数,这里的默认时间比较函数比较简单,就是单纯判断时间点的大小,相等返回0,第一个时间点大于第二个返回1,小于返回-1。

[cpp]?viewplain?copy?

/?

??gst_collect_pads_find_best_pad:?

??@pads:?the?collectpads?to?use?

??@data:?returns?the?collectdata?for?earliest?data?

??@time:?returns?the?earliest?available?buffertime?

??

??Find?the?oldest/best?pad,?i.e.?pad?holding?the?oldest?buffer?and?

??and?return?the?corresponding?#GstCollectData?and?buffertime.?

??

??This?function?should?be?called?with?STREAM_LOCK?held,?

??such?as?in?the?callback.?

?/??

static?void??

gst_collect_pads_find_best_pad?(GstCollectPads??pads,??

????GstCollectData??data,?GstClockTime??time)??

{??

??GSList?collected;??

??GstCollectData?best?=?NULL;??

??GstClockTime?best_time?=?GST_CLOCK_TIME_NONE;??

??

??g_return_if_fail?(data?!=?NULL);??

??g_return_if_fail?(time?!=?NULL);??

??

??/?遍历所有pads,对所有pads中的数据与当前的earliest_time进行比较,?

????寻找时间最靠前的buffer及其对应的pad。??/??

??for?(collected?=?pads->data;?collected;?collected?=?g_slist_next?(collected))?{??

????GstBufferwww.baiyuewang.net?buffer;??

????GstCollectData?data?=?(GstCollectData?)?collected->data;??

????GstClockTime?timestamp;??

??

????buffer?=?gst_collect_pads_peek?(pads,?data);??

????/?if?we?have?a?buffer?check?if?it?is?better?then?the?current?best?one?/??

????if?(buffer?!=?NULL)?{??

??????timestamp?=?GST_BUFFER_DTS_OR_PTS?(buffer);??

??????gst_buffer_unref?(buffer);??

??????if?(best?==?NULL?||?pads->priv->compare_func?(pads,?data,?timestamp,??

??????????????best,?best_time,?pads->priv->compare_user_data)?
????????best?=?data;??

????????best_time?=?timestamp;??

??????}??

????}??

??}??

??

??/?set?earliest?time?/??

??data?=?best;??

??time?=?best_time;??

??

??GST_DEBUG_OBJECT?(pads,?"best?pad?%s,?best?time?%"?GST_TIME_FORMAT,??

??????best???GST_PAD_NAME?(((GstCollectData?)?best)->pad)?:?"(nil)",??

??????GST_TIME_ARGS?(best_time));??

}??

3.5重新计算等待状态函数

????函数gst_collect_pads_recalculate_waiting会根据earliest_time和所有pad上的数据进行比较,计算collectpad是否需要重新进入等待状态,返回TRUE表示从非等待状态进入等待状态。

????这里如果设置了non-waiting状态,则要小心4.3中出现的异常。

[cpp]?viewplain?copy?

/?General?overview:?

??-?only?pad?with?a?buffer?can?determine?earliest_data?(and?earliest_time)?

??-?only?segment?info?determines?(non-)waiting?state?

??-???perhaps?use?_stream_time?for?comparison?

????(which?muxers?might?have?use?as?well??)?

?/??

??

/?

??Function?to?recalculate?the?waiting?state?of?all?pads.?

??

??Must?be?called?with?STREAM_LOCK.?

??

??Returns?%TRUE?if?a?pad?was?set?to?waiting?

??(from?non-waiting?state).?

?/??

static?gboolean??

gst_collect_pads_recalculate_waiting?(GstCollectPads??pads)??

{??

??GSList?collected;??

??gboolean?result?=?FALSE;??

??

??/?If?earliest?time?is?not?known,?there?is?nothing?to?do.?/??

??/?没有数据可以比较。??/??

??if?(pads->priv->earliest_data?==?NULL)??

????return?FALSE;??

??

??/?遍历所有pads。??/??

??for?(collected?=?pads->data;?collected;?collected?=?g_slist_next?(collected))?{??

????GstCollectData?data?=?(GstCollectData?)?collected->data;??

????int?cmp_res;??

????GstClockTime?comp_time;??

??

????/?check?if?pad?has?a?segment?/??

????/?检查本pad上对应的时间信息。??/??

????if?(data->segment.format?==?GST_FORMAT_UNDEFINED)?{??

??????GST_WARNING_OBJECT?(pads,??

??????????"GstCollectPads?has?no?time?segment,?assuming?0?based.");??

??????gst_segment_init?(&data->segment,?GST_FORMAT_TIME);??

??????GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_NEW_SEGMENT);??

????}??

??

????/?check?segment?format?/??

????if?(data->segment.format?!=?GST_FORMAT_TIME)?{??

??????GST_ERROR_OBJECT?(pads,?"GstCollectPads?can?handle?only?time?segments.");??

??????continue;??

????}??

??

????/?check?if?the?waiting?state?should?be?changed?/??

????/?将earliest_time和当前pad上的时间信息进行比较。?

??????当cmp_res为1,表示本pad的时间比earliest_time晚,这时候数据可以消费,不需要等待。?

??????将返回FALSE,在函数gst_collect_pads_default_collected执行buffer_func消费buffer。?

??????否则表示本pad时间比earliest_time早或者相等,如果这时候是在非等待状态,则要设置成等待状态,?

??????同时返回TRUE,并在gst_collect_pads_default_collected不处理buffer,返回GST_FLOW_OK,重新计算best。?

??????这里要注意设置了non-waiting后的第一个包,第一个包的时间有可能是相同的,即0:00?==?0:00?

?????/??

????comp_time?=?data->segment.position;??

????cmp_res?=?pads->priv->compare_func?(pads,?data,?comp_time,??

????????pads->priv->earliest_data,?pads->priv->earliest_time,??

????????pads->priv->compare_user_data);??

????if?(cmp_res?>?0)??

??????/?stop?waiting?/??

??????gst_collect_pads_set_waiting?(pads,?data,?FALSE);??

????else?{??

??????if?(!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING))?{??

????????/?start?waiting?/??

????????gst_collect_pads_set_waiting?(pads,?data,?TRUE);??

????????result?=?TRUE;??

??????}??

????}??

??}??

??

??return?result;??

}??

4.6锁和等待状态

????collectpad提供了接口gst_collect_pads_set_waiting可以给其他组件设置某个pad为等待或者非等待状态。其设置与锁GST_COLLECT_PADS_STATE_LOCKED标志位有关系。

????默认情况下的pad(注意,这里的pad为单独的一个pad,并非整个collectpad,这些状态为单个pad私有,并不是collectpad的属性)均为等待状态,而锁的初始化则根据element调用collect添加pad的函数gst_collect_pads_add_pad的最后一个参数决定。

[cpp]?viewplain?copy?

/?

??gst_collect_pads_set_waiting:?

??@pads:?the?collectpads?

??@data:?the?data?to?use?

??@waiting:?boolean?indicating?whether?this?pad?should?operate?

????????????in?waiting?or?non-waiting?mode?

??

??Sets?a?pad?to?waiting?or?non-waiting?mode,?if?at?least?this?pad?

??has?not?been?created?with?locked?waiting?state,?

??in?which?case?nothing?happens.?

??

??This?function?should?be?called?with?@pads?STREAM_LOCK?held,?such?as?

??in?the?callback.?

??

??MT?safe.?

?/??

void??

gst_collect_pads_set_waiting?(GstCollectPads??pads,?GstCollectData??data,??

????gboolean?waiting)??

{??

??g_return_if_fail?(pads?!=?NULL);??

??g_return_if_fail?(GST_IS_COLLECT_PADS?(pads));??

??g_return_if_fail?(data?!=?NULL);??

??

??GST_DEBUG_OBJECT?(pads,?"Setting?pad?%s?to?waiting?%d,?locked?%d",??

??????GST_PAD_NAME?(data->pad),?waiting,??

??????GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED));??

??

??/?Do?something?only?on?a?change?and?if?not?locked?/??

??/?修改等待状态标志位需要在没有上锁的情况下处理,?

????可以通过GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED);方式加解锁。?

????如果设置为非等待,则会把对应的queuedpad自增,当所有pad都处于非等待状态,则框架收集条件总是满足。?

????可能存在4.3.2的问题。?

???/??

??if?(!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED)?&&??

??????(GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING)?!=??

??????????!?!waiting))?{??

????/?Set?waiting?state?for?this?pad?/??

????if?(waiting)??

??????GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_WAITING);??

????else??

??????GST_COLLECT_PADS_STATE_UNSET?(data,?GST_COLLECT_PADS_STATE_WAITING);??

????/?Update?number?of?queued?pads?if?needed?/??

????if?(!data->buffer?&&??

????????!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_EOS))?{??

??????if?(waiting)??

????????pads->priv->queuedpads--;??

??????else??

????????pads->priv->queuedpads++;??

????}??

??

????/?signal?waiters?because?something?changed?/??

????GST_COLLECT_PADS_EVT_BROADCAST?(pads);??

??}??

}??





献花(0)
+1
(本文系thedust79首藏)