gstreamer的collectpad源码分析
1.背景:
[cpp]?viewplain?copy?
/?
??GstCollectPads:?
??@data:?(element-type?GstBase.CollectData):?#GList?of?#GstCollectData?managed?
????by?this?#GstCollectPads.?
??
??Collectpads?object.?
?/??
struct?_GstCollectPads?{??
??/?基类。??/??
??GstObject??????object;??
??
??//?/?with?LOCK?and/or?STREAM_LOCK?/??
??/?所有PAD的集合。??/??
??/?
?????GstCollectData:?
?????@collect:?owner?#GstCollectPads?
?????@pad:?#GstPad?managed?by?this?data?
?????@buffer:?currently?queued?buffer.?
?????@pos:?position?in?the?buffer?
?????@segment:?last?segment?received.?
?????@dts:?the?signed?version?of?the?DTS?converted?to?running?time.?To?access?
???????????this?memeber,?use?%GST_COLLECT_PADS_DTS?macro.?(Since?1.6)?
?????
?????Structure?used?by?the?collect_pads.?
????struct?_GstCollectData?
????{?
??????/?with?STREAM_LOCK?of?@collect?/??
??????/?指向回collectpad。??/??
??????GstCollectPads????????collect;??
??????GstPad????????????????pad;??
??????GstBuffer?????????????buffer;??
??????guint??????????????????pos;??
??????GstSegment?????????????segment;??
??????
??????//??
??????/?state:?bitfield?for?easier?extension;?
????????eos,?flushing,?new_segment,?waiting?/??
??????GstCollectPadsStateFlags????state;??
??????
??????GstCollectDataPrivate?priv;??
??????
??????union?{??
????????struct?{??
??????????//??
??????????gint64?dts;??
??????????//??
????????}?abi;??
????????gpointer?_gst_reserved[GST_PADDING];??
??????}?ABI;??
????};??
???/??
??GSList????????data;??????????????????/?list?of?CollectData?items?/??
??
??//??
??GRecMutex??????stream_lock;??????????/?used?to?serialize?collection?among?several?streams?/??
??
??GstCollectPadsPrivate?priv;??
??
??gpointer?_gst_reserved[GST_PADDING];??
};??
4.代码分析:
4.1主入口函数:
????主入口函数gst_collect_pads_chain,不同pad工作于不同线程中。代码分析如下:
[cpp]?viewplain?copy?
/?For?each?buffer?we?receive?we?check?if?our?collected?condition?is?reached?
??and?if?so?we?call?the?collected?function.?When?this?is?done?we?check?if?
??data?has?been?unqueued.?If?data?is?still?queued?we?wait?holding?the?stream?
??lock?to?make?sure?no?EOS?event?can?happen?while?we?are?ready?to?be?
??collected??
?/??
static?GstFlowReturn??
gst_collect_pads_chain?(GstPad??pad,?GstObject??parent,?GstBuffer??buffer)??
{??
??GstCollectData?data;??
??GstCollectPads?pads;??
??GstFlowReturn?ret;??
??GstBuffer?buffer_p;??
??guint32?cookie;??
??
??GST_DEBUG?("Got?buffer?for?pad?%s:%s",?GST_DEBUG_PAD_NAME?(pad));??
??
??/?some?magic?to?get?the?managing?collect_pads?/??
??GST_OBJECT_LOCK?(pad);??
??data?=?(GstCollectData?)?gst_pad_get_element_private?(pad);??
??if?(G_UNLIKELY?(data?==?NULL))??
????goto?no_data;??
??ref_data?(data);??
??GST_OBJECT_UNLOCK?(pad);??
??
??pads?=?data->collect;??
??
??GST_COLLECT_PADS_STREAM_LOCK?(pads);??
??/?状态判断。??/??
??/?if?not?started,?bail?out?/??
??if?(G_UNLIKELY?(!pads->priv->started))??
????goto?not_started;??
??/?check?if?this?pad?is?flushing?/??
??if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??
??????????????GST_COLLECT_PADS_STATE_FLUSHING)))??
????goto?flushing;??
??/?pad?was?EOS,?we?can?refuse?this?data?/??
??if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??
??????????????GST_COLLECT_PADS_STATE_EOS)))??
????goto?eos;??
??
??/?see?if?we?need?to?clip?/??
??/?数据前处理。??/??
??if?(pads->priv->clip_func)?{??
????GstBuffer?outbuf?=?NULL;??
????ret?=??
????????pads->priv->clip_func?(pads,?data,?buffer,?&outbuf,??
????????pads->priv->clip_user_data);??
????buffer?=?outbuf;??
??
????if?(G_UNLIKELY?(outbuf?==?NULL))??
??????goto?clipped;??
??
????if?(G_UNLIKELY?(ret?==?GST_FLOW_EOS))??
??????goto?eos;??
????else?if?(G_UNLIKELY?(ret?!=?GST_FLOW_OK))??
??????goto?error;??
??}??
??
??GST_DEBUG_OBJECT?(pads,?"Queuing?buffer?%p?for?pad?%s:%s",?buffer,??
??????GST_DEBUG_PAD_NAME?(pad));??
??
??/?One?more?pad?has?data?queued?/??
??//?如果当前collectpad处于WAITING状态会将queuedpads增加??
??if?(GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING))??
????pads->priv->queuedpads++;??
??buffer_p?=?&data->buffer;??
??gst_buffer_replace?(buffer_p,?buffer);??
??
??/?update?segment?last?position?if?in?TIME?/??
??/?更新当前pad上对应的时间信息,后续用于重新计算等待状态需要用到。??/??
??if?(G_LIKELY?(data->segment.format?==?GST_FORMAT_TIME))?{??
????GstClockTime?timestamp;??
??
????timestamp?=?GST_BUFFER_DTS_OR_PTS?(buffer);??
??
????if?(GST_CLOCK_TIME_IS_VALID?(timestamp))??
??????data->segment.position?=?timestamp;??
??}??
??
??/?While?we?have?data?queued?on?this?pad?try?to?collect?stuff?/??
??do?{??
????/?Check?if?our?collected?condition?is?matched?and?call?the?collected?
??????function?if?it?is?/??
????/?主要处理函数,判断收集条件是否满足,后续分析。??/??
????ret?=?gst_collect_pads_check_collected?(pads);??
????/?when?an?error?occurs,?we?want?to?report?this?back?to?the?caller?ASAP?
??????without?having?to?block?if?the?buffer?was?not?popped?/??
????/?数据流处理异常,进入异常处理分支。??/??
????if?(G_UNLIKELY?(ret?!=?GST_FLOW_OK))??
??????goto?error;??
??
????/?data?was?consumed,?we?can?exit?and?accept?new?data?/??
????/?当buffer在check_collected函数中被消费,会在其中减少引用次数,释放buffer。?
??????数据被处理后退出循环,等待下一次buffer到来调用chain函数。??/??
????if?(data->buffer?==?NULL)??
??????break;??
??
????/?数据未被处理,未满足数据收集条件,本pad对应线程将进行唤醒等待。??/??
????/?Having?the?_INIT?here?means?we?don''t?care?about?any?broadcast?up?to?here?
??????(most?of?which?occur?with?STREAM_LOCK?held,?so?could?not?have?happened?
??????anyway).??We?do?care?about?e.g.?a?remove?initiated?broadcast?as?of?this?
??????point.??Putting?it?here?also?makes?this?thread?ignores?any?evt?it?raised?
??????itself?(as?is?a?usual?WAIT?semantic).?
?????/??
????GST_COLLECT_PADS_EVT_INIT?(cookie);??
??
????/?pad?could?be?removed?and?re-added?/??
????unref_data?(data);??
????GST_OBJECT_LOCK?(pad);??
????if?(G_UNLIKELY?((data?=?gst_pad_get_element_private?(pad))?==?NULL))??
??????goto?pad_removed;??
????ref_data?(data);??
????GST_OBJECT_UNLOCK?(pad);??
??
????GST_DEBUG_OBJECT?(pads,?"Pad?%s:%s?has?a?buffer?queued,?waiting",??
????????GST_DEBUG_PAD_NAME?(pad));??
??
????/?wait?to?be?collected,?this?must?happen?from?another?thread?triggered?
??????by?the?_chain?function?of?another?pad.?We?release?the?lock?so?we?
??????can?get?stopped?or?flushed?aswww.wang027.com?well.?We?can?however?not?get?EOS?
??????because?we?still?hold?the?STREAM_LOCK.?
?????/??
????/?等待条件变量被唤醒。??/??
????GST_COLLECT_PADS_STREAM_UNLOCK?(pads);??
????GST_COLLECT_PADS_EVT_WAIT?(pads,?cookie);??
????GST_COLLECT_PADS_STREAM_LOCK?(pads);??
??
????GST_DEBUG_OBJECT?(pads,?"Pad?%s:%s?resuming",?GST_DEBUG_PAD_NAME?(pad));??
??
????/?唤醒后的状态判断。??/??
????/?after?a?signal,?we?could?be?stopped?/??
????if?(G_UNLIKELY?(!pads->priv->started))??
??????goto?not_started;??
????/?check?if?this?pad?is?flushing?/??
????if?(G_UNLIKELY?(GST_COLLECT_PADS_STATE_IS_SET?(data,??
????????????????GST_COLLECT_PADS_STATE_FLUSHING)))??
??????goto?flushing;??
??}??
??while?(data->buffer?!=?NULL);??
??
unlock_done:??
??GST_COLLECT_PADS_STREAM_UNLOCK?(pads);??
??/?data?is?definitely?NULL?if?pad_removed?goto?was?run.?/??
??if?(data)??
????unref_data?(data);??
??if?(buffer)??
????gst_buffer_unref?(buffer);??
??return?ret;??
??
/?异常状态处理。??/??
pad_removed:??
??{??
????GST_WARNING?("%s?got?removed?from?collectpads",?GST_OBJECT_NAME?(pad));??
????GST_OBJECT_UNLOCK?(pad);??
????ret?=?GST_FLOW_NOT_LINKED;??
????goto?unlock_done;??
??}??
??/?ERRORS?/??
no_data:??
??{??
????GST_DEBUG?("%s?got?removed?from?collectpads",?GST_OBJECT_NAME?(pad));??
????GST_OBJECT_UNLOCK?(pad);??
????gst_buffer_unref?(buffer);??
????return?GST_FLOW_NOT_LINKED;??
??}??
not_started:??
??{??
????GST_DEBUG?("not?started");??
????gst_collect_pads_clear?(pads,?data);??
????ret?=?GST_FLOW_FLUSHING;??
????goto?unlock_done;??
??}??
flushing:??
??{??
????GST_DEBUG?("pad?%s:%s?is?flushing",?GST_DEBUG_PAD_NAME?(pad));??
????gst_collect_pads_clear?(pads,?data);??
????ret?=?GST_FLOW_FLUSHING;??
????goto?unlock_done;??
??}??
eos:??
??{??
????/?we?should?not?post?an?error?for?this,?just?inform?upstream?that?
??????we?don''t?expect?anything?anymore?/??
????GST_DEBUG?("pad?%s:%s?is?eos",?GST_DEBUG_PAD_NAME?(pad));??
????ret?=?GST_FLOW_EOS;??
????goto?unlock_done;??
??}??
clipped:??
??{??
????GST_DEBUG?("clipped?buffer?on?pad?%s:%s",?GST_DEBUG_PAD_NAME?(pad));??
????ret?=?GST_FLOW_OK;??
????goto?unlock_done;??
??}??
error:??
??{??
????/?we?print?the?error,?the?element?should?post?a?reasonable?error?
??????message?for?fatal?errors?/??
????GST_DEBUG?("collect?failed,?reason?%d?(%s)",?ret,?gst_flow_get_name?(ret));??
????gst_collect_pads_clear?(pads,?data);??
????goto?unlock_done;??
??}??
}??
4.2框架上的收集条件判断
????在check函数,首先对collectpads上面的pad状态进行检查,只有当有数据的pads和总的pads数满足一定条件时候,才会执行第二重的收集条件判断。函数为gst_collect_pads_check_collected,代码分析如下:
[cpp]?viewplain?copy?
static?GstFlowReturn??
gst_collect_pads_check_collected?(GstCollectPads??pads)??
{??
??GstFlowReturn?flow_ret?=?GST_FLOW_OK;??
??GstCollectPadsFunction?func;??
??gpointer?user_data;??
??
??g_return_val_if_fail?(GST_IS_COLLECT_PADS?(pads),?GST_FLOW_ERROR);??
??
??/?获取回调数据。??/??
??GST_OBJECT_LOCK?(pads);??
??func?=?pads->priv->func;??
??user_data?=?pads->priv->user_data;??
??GST_OBJECT_UNLOCK?(pads);??
??
??g_return_val_if_fail?(pads->priv->func?!=?NULL,?GST_FLOW_NOT_SUPPORTED);??
??
??/?check?for?new?pads,?update?stats?etc..?/??
??/?主要是对等待唤醒的pad的cookie进行校验。??/??
??gst_collect_pads_check_pads?(pads);??
??
??/?所有pad都是EOS状态。直接处理剩余的所有数据。??/??
??if?(G_UNLIKELY?(pads->priv->eospads?==?pads->priv->numpads))?{??
????/?If?all?our?pads?are?EOS?just?collect?once?to?let?the?element?
??????do?its?final?EOS?handling.?/??
????GST_DEBUG_OBJECT?(pads,?"All?active?pads?(%d)?are?EOS,?calling?%s",??
????????pads->priv->numpads,?GST_DEBUG_FUNCPTR_NAME?(func));??
??
????if?(G_UNLIKELY?(g_atomic_int_compare_and_exchange?(&pads->priv->seeking,??
????????????????TRUE,?FALSE)))?{??
??????GST_INFO_OBJECT?(pads,?"finished?seeking");??
????}??
????do?{??
??????flow_ret?=?func?(pads,?user_data);??
????}?while?(flow_ret?==?GST_FLOW_OK);??
??}?else?{??
????/?有pad处于非EOS状态。??/??
????gboolean?collected?=?FALSE;??
??
????/?We?call?the?collected?function?as?long?as?our?condition?matches.?/??
????/?只有满足(有数据的有效pad数?+?无效pad数?>=?总的pad数)时,才可以进入下一步的?
??????条件判断,这个判断是框架级别的判断,总是存在,其余重载的判断函数(func)都在这个循环中处理。?
??????如果函数不执行,则buffer一定不会被消费,在外层会走入线程挂起等待唤醒的流程。??/??
????while?(((pads->priv->queuedpads?+?pads->priv->eospads)?>=??
????????????pads->priv->numpads))?{??
??????GST_DEBUG_OBJECT?(pads,??
??????????"All?active?pads?(%d?+?%d?>=?%d)?have?data,?"?"calling?%s",??
??????????pads->priv->queuedpads,?pads->priv->eospads,?pads->priv->numpads,??
??????????GST_DEBUG_FUNCPTR_NAME?(func));??
??
??????if?(G_UNLIKELY?(g_atomic_int_compare_and_exchange?(&pads->priv->seeking,??
??????????????????TRUE,?FALSE)))?{??
????????GST_INFO_OBJECT?(pads,?"finished?seeking");??
??????}??
????????
??????/?具体数据的收集条件判断。??/??
??????flow_ret?=?func?(pads,?user_data);??
??????collected?=?TRUE;??
??
??????/?数据处理异常或者已经没有有数据的pad了,中断循环。??/??
??????/?break?on?error?/??
??????if?(flow_ret?!=?GST_FLOW_OK)??
????????break;??
??????/?Don''t?keep?looping?after?telling?the?element?EOS?or?flushing?/??
??????if?(pads->priv->queuedpads?==?0)??
????????break;??
????}??
????if?(!collected)??
??????GST_DEBUG_OBJECT?(pads,?"Not?all?active?pads?(%d)?have?data,?continuing",??
??????????pads->priv->numpads);??
??}??
??return?flow_ret;??
}??
4.3默认的第二重收集条件判断
????第二重收集条件的函数是可以进行重载的,可以使用gst_collect_pads_set_function进行设置,这里只分析默认的函数gst_collect_pads_default_collected。
????代码分析如下:
[cpp]?viewplain?copy?
/?
??Default?collect?callback?triggered?when?#GstCollectPads?gathered?all?data.?
??
??Called?with?STREAM_LOCK.?
?/??
static?GstFlowReturn??
gst_collect_pads_default_collected?(GstCollectPads??pads,?gpointer?user_data)??
{??
??GstCollectData?best?=?NULL;??
??GstBuffer?buffer;??
??GstFlowReturn?ret?=?GST_FLOW_OK;??
??GstCollectPadsBufferFunction?func;??
??gpointer?buffer_user_data;??
??
??g_return_val_if_fail?(GST_IS_COLLECT_PADS?(pads),?GST_FLOW_ERROR);??
??
??/?获取回调数据。??/??
??GST_OBJECT_LOCK?(pads);??
??func?=?pads->priv->buffer_func;??
??buffer_user_data?=?pads->priv->buffer_user_data;??
??GST_OBJECT_UNLOCK?(pads);??
??
??g_return_val_if_fail?(func?!=?NULL,?GST_FLOW_NOT_SUPPORTED);??
??
??/?Find?the?oldest?pad?at?all?cost?/??
??/?寻找最合适的pad,并计算最早的数据和时间戳。?
????最后返回gst_collect_pads_recalculate_waiting的返回值,?
????TRUE表示从非等待状态变为等待状态。?
????在默认场景下,只有使用了set_wait为FALSE时候才会标记为non-waiting状态。?
????因此如果在默认框架下主动设置了non-waiting状态,需要留意时间比较函数。?
????否则这里会进入一个?
????FLOW_OK?->?数据没有POP?->?pad_num没有变化?->?gst_collect_pads_check_collected主循环中死循环的问题。??/??
??if?(gst_collect_pads_recalculate_full?(pads))?{??
????/?waiting?was?switched?on,?
??????so?give?another?thread?a?chance?to?deliver?a?possibly?
??????older?buffer;?don''t?charge?on?yet?with?the?current?oldest?/??
????ret?=?GST_FLOW_OK;??
????goto?done;??
??}??
??
??best?=?pads->priv->earliest_data;??
??
??/?No?data?collected?means?EOS.?/??
??/?在waiting状态下,但是没有最新的数据包,因此认为这个pad已经进入EOS状态了。无法接收数据。?
????注意,这里设置non-waiting以后并修改了时间比较函数后,其他地方调用默认函数,也会导致一个问题:?
????由于non-waiting增加了queuedpad,因此如果总的有两个pad,且两个pad都设置了non-waiting后,在函数?
????gst_collect_pads_check_collected中条件判断总是成立,且queuedpad在non-waiting状态下无法自减,?
????第一次进入时候就会把所有的pad的数据直接处理,及时处理完所有数据后,依旧走到这里进行判断,?
????但是这时候buffer已经为空,导致collectpad认为这个pad的数据已经进入EOS状态,处理异常。??/??
??if?(G_UNLIKELY?(best?==?NULL))?{??
????ret?=?func?(pads,?best,?NULL,?buffer_user_data);??
????if?(ret?==?GST_FLOW_OK)??
??????ret?=?GST_FLOW_EOS;??
????goto?done;??
??}??
??
??/?make?sure?that?the?pad?we?take?a?buffer?from?is?waiting;?
????otherwise?popping?a?buffer?will?seem?not?to?have?happened?
????and?collectpads?can?get?into?a?busy?loop?/??
??gst_collect_pads_set_waiting?(pads,?best,?TRUE);??
??
??/?Send?buffer?/??
??/?使用pop弹出buffer,并将buffer发送给buffer_func进行处理。??/??
??buffer?=?gst_collect_pads_pop?(pads,?best);??
??ret?=?func?(pads,?best,?buffer,?buffer_user_data);??
??
??/?maybe?non-waiting?was?forced?to?waiting?above?due?to?
????newsegment?events?coming?too?sparsely,?
????so?re-check?to?restore?state?to?avoid?hanging/waiting?/??
??gst_collect_pads_recalculate_full?(pads);??
??
done:??
??return?ret;??
}??
????注意,这里如果对某些函数进行重载或者设置了非等待状态,有两个潜在的异常流程。
4.3.1异常流程1:
????当使用默认的时间比较函数,且设置了非等待状态的pad有数据到来时,在函数gst_collect_pads_recalculate_waiting,当earliest_data检测到本PAD时,这时候时间戳应该是相等的,但是这时候如果处于非等待状态,无论是否加锁最后都会返回TRUE,这时候gst_collect_pads_default_collected函数中的第一个判断总会直接返回GST_FLOW_OK,但是实际并没有弹出任何buffer,但是gst_collect_pads_check_collected的循环条件并没有改变,导致这个线程会一直在这里循环。如果其他pad没有数据进入,则这个pad会进入死循环。
4.3.2异常流程2:
????当所有的pads都设置了non-waiting状态,则在框架的收集条件判断函数gst_collect_pads_check_collected中的pads数量比较循环总是成立,且所有pads数据弹出时都不会减少当前的queuedpad数,因此当有一个buffer弹出后,会持续弹出所有buffer,当buffer为空时,循环条件依旧成立,在处理空buffer时,认为这个pad已经进入了EOS状态,从而导致异常。异常流程如下图:
4.4寻找最优的可用buffer和pad
????这个函数流程比较简单,就是遍历collectpads中的所有pad,并和earliest_time进行比较,寻找最早的时间点的buffer。
????这里涉及到时间比较的函数,这里的默认时间比较函数比较简单,就是单纯判断时间点的大小,相等返回0,第一个时间点大于第二个返回1,小于返回-1。
[cpp]?viewplain?copy?
/?
??gst_collect_pads_find_best_pad:?
??@pads:?the?collectpads?to?use?
??@data:?returns?the?collectdata?for?earliest?data?
??@time:?returns?the?earliest?available?buffertime?
??
??Find?the?oldest/best?pad,?i.e.?pad?holding?the?oldest?buffer?and?
??and?return?the?corresponding?#GstCollectData?and?buffertime.?
??
??This?function?should?be?called?with?STREAM_LOCK?held,?
??such?as?in?the?callback.?
?/??
static?void??
gst_collect_pads_find_best_pad?(GstCollectPads??pads,??
????GstCollectData??data,?GstClockTime??time)??
{??
??GSList?collected;??
??GstCollectData?best?=?NULL;??
??GstClockTime?best_time?=?GST_CLOCK_TIME_NONE;??
??
??g_return_if_fail?(data?!=?NULL);??
??g_return_if_fail?(time?!=?NULL);??
??
??/?遍历所有pads,对所有pads中的数据与当前的earliest_time进行比较,?
????寻找时间最靠前的buffer及其对应的pad。??/??
??for?(collected?=?pads->data;?collected;?collected?=?g_slist_next?(collected))?{??
????GstBufferwww.baiyuewang.net?buffer;??
????GstCollectData?data?=?(GstCollectData?)?collected->data;??
????GstClockTime?timestamp;??
??
????buffer?=?gst_collect_pads_peek?(pads,?data);??
????/?if?we?have?a?buffer?check?if?it?is?better?then?the?current?best?one?/??
????if?(buffer?!=?NULL)?{??
??????timestamp?=?GST_BUFFER_DTS_OR_PTS?(buffer);??
??????gst_buffer_unref?(buffer);??
??????if?(best?==?NULL?||?pads->priv->compare_func?(pads,?data,?timestamp,??
??????????????best,?best_time,?pads->priv->compare_user_data)?0)?{??
????????best?=?data;??
????????best_time?=?timestamp;??
??????}??
????}??
??}??
??
??/?set?earliest?time?/??
??data?=?best;??
??time?=?best_time;??
??
??GST_DEBUG_OBJECT?(pads,?"best?pad?%s,?best?time?%"?GST_TIME_FORMAT,??
??????best???GST_PAD_NAME?(((GstCollectData?)?best)->pad)?:?"(nil)",??
??????GST_TIME_ARGS?(best_time));??
}??
3.5重新计算等待状态函数
????函数gst_collect_pads_recalculate_waiting会根据earliest_time和所有pad上的数据进行比较,计算collectpad是否需要重新进入等待状态,返回TRUE表示从非等待状态进入等待状态。
????这里如果设置了non-waiting状态,则要小心4.3中出现的异常。
[cpp]?viewplain?copy?
/?General?overview:?
??-?only?pad?with?a?buffer?can?determine?earliest_data?(and?earliest_time)?
??-?only?segment?info?determines?(non-)waiting?state?
??-???perhaps?use?_stream_time?for?comparison?
????(which?muxers?might?have?use?as?well??)?
?/??
??
/?
??Function?to?recalculate?the?waiting?state?of?all?pads.?
??
??Must?be?called?with?STREAM_LOCK.?
??
??Returns?%TRUE?if?a?pad?was?set?to?waiting?
??(from?non-waiting?state).?
?/??
static?gboolean??
gst_collect_pads_recalculate_waiting?(GstCollectPads??pads)??
{??
??GSList?collected;??
??gboolean?result?=?FALSE;??
??
??/?If?earliest?time?is?not?known,?there?is?nothing?to?do.?/??
??/?没有数据可以比较。??/??
??if?(pads->priv->earliest_data?==?NULL)??
????return?FALSE;??
??
??/?遍历所有pads。??/??
??for?(collected?=?pads->data;?collected;?collected?=?g_slist_next?(collected))?{??
????GstCollectData?data?=?(GstCollectData?)?collected->data;??
????int?cmp_res;??
????GstClockTime?comp_time;??
??
????/?check?if?pad?has?a?segment?/??
????/?检查本pad上对应的时间信息。??/??
????if?(data->segment.format?==?GST_FORMAT_UNDEFINED)?{??
??????GST_WARNING_OBJECT?(pads,??
??????????"GstCollectPads?has?no?time?segment,?assuming?0?based.");??
??????gst_segment_init?(&data->segment,?GST_FORMAT_TIME);??
??????GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_NEW_SEGMENT);??
????}??
??
????/?check?segment?format?/??
????if?(data->segment.format?!=?GST_FORMAT_TIME)?{??
??????GST_ERROR_OBJECT?(pads,?"GstCollectPads?can?handle?only?time?segments.");??
??????continue;??
????}??
??
????/?check?if?the?waiting?state?should?be?changed?/??
????/?将earliest_time和当前pad上的时间信息进行比较。?
??????当cmp_res为1,表示本pad的时间比earliest_time晚,这时候数据可以消费,不需要等待。?
??????将返回FALSE,在函数gst_collect_pads_default_collected执行buffer_func消费buffer。?
??????否则表示本pad时间比earliest_time早或者相等,如果这时候是在非等待状态,则要设置成等待状态,?
??????同时返回TRUE,并在gst_collect_pads_default_collected不处理buffer,返回GST_FLOW_OK,重新计算best。?
??????这里要注意设置了non-waiting后的第一个包,第一个包的时间有可能是相同的,即0:00?==?0:00?
?????/??
????comp_time?=?data->segment.position;??
????cmp_res?=?pads->priv->compare_func?(pads,?data,?comp_time,??
????????pads->priv->earliest_data,?pads->priv->earliest_time,??
????????pads->priv->compare_user_data);??
????if?(cmp_res?>?0)??
??????/?stop?waiting?/??
??????gst_collect_pads_set_waiting?(pads,?data,?FALSE);??
????else?{??
??????if?(!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING))?{??
????????/?start?waiting?/??
????????gst_collect_pads_set_waiting?(pads,?data,?TRUE);??
????????result?=?TRUE;??
??????}??
????}??
??}??
??
??return?result;??
}??
4.6锁和等待状态
????collectpad提供了接口gst_collect_pads_set_waiting可以给其他组件设置某个pad为等待或者非等待状态。其设置与锁GST_COLLECT_PADS_STATE_LOCKED标志位有关系。
????默认情况下的pad(注意,这里的pad为单独的一个pad,并非整个collectpad,这些状态为单个pad私有,并不是collectpad的属性)均为等待状态,而锁的初始化则根据element调用collect添加pad的函数gst_collect_pads_add_pad的最后一个参数决定。
[cpp]?viewplain?copy?
/?
??gst_collect_pads_set_waiting:?
??@pads:?the?collectpads?
??@data:?the?data?to?use?
??@waiting:?boolean?indicating?whether?this?pad?should?operate?
????????????in?waiting?or?non-waiting?mode?
??
??Sets?a?pad?to?waiting?or?non-waiting?mode,?if?at?least?this?pad?
??has?not?been?created?with?locked?waiting?state,?
??in?which?case?nothing?happens.?
??
??This?function?should?be?called?with?@pads?STREAM_LOCK?held,?such?as?
??in?the?callback.?
??
??MT?safe.?
?/??
void??
gst_collect_pads_set_waiting?(GstCollectPads??pads,?GstCollectData??data,??
????gboolean?waiting)??
{??
??g_return_if_fail?(pads?!=?NULL);??
??g_return_if_fail?(GST_IS_COLLECT_PADS?(pads));??
??g_return_if_fail?(data?!=?NULL);??
??
??GST_DEBUG_OBJECT?(pads,?"Setting?pad?%s?to?waiting?%d,?locked?%d",??
??????GST_PAD_NAME?(data->pad),?waiting,??
??????GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED));??
??
??/?Do?something?only?on?a?change?and?if?not?locked?/??
??/?修改等待状态标志位需要在没有上锁的情况下处理,?
????可以通过GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED);方式加解锁。?
????如果设置为非等待,则会把对应的queuedpad自增,当所有pad都处于非等待状态,则框架收集条件总是满足。?
????可能存在4.3.2的问题。?
???/??
??if?(!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_LOCKED)?&&??
??????(GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_WAITING)?!=??
??????????!?!waiting))?{??
????/?Set?waiting?state?for?this?pad?/??
????if?(waiting)??
??????GST_COLLECT_PADS_STATE_SET?(data,?GST_COLLECT_PADS_STATE_WAITING);??
????else??
??????GST_COLLECT_PADS_STATE_UNSET?(data,?GST_COLLECT_PADS_STATE_WAITING);??
????/?Update?number?of?queued?pads?if?needed?/??
????if?(!data->buffer?&&??
????????!GST_COLLECT_PADS_STATE_IS_SET?(data,?GST_COLLECT_PADS_STATE_EOS))?{??
??????if?(waiting)??
????????pads->priv->queuedpads--;??
??????else??
????????pads->priv->queuedpads++;??
????}??
??
????/?signal?waiters?because?something?changed?/??
????GST_COLLECT_PADS_EVT_BROADCAST?(pads);??
??}??
}??
|
|