分享

简洁rtmp源站服务器

 WindySky 2017-08-02

 公司在做编码器,需要和rtmp服务器对接,nginx-rtmp实在是太复杂,不方便调试,所以做了一个简单rtmp服务器。

参与这个项目的几个朋友,说不仅仅想实现流媒体源站功能,还想写出漂亮的代码,成为“国人软件代码之典范”。

哈哈,目标是远大的,代码得一点一点写。

https://github.com/winlinvip/simple-rtmp-server


一路输入,多路(实例是12路输出):



支持Flash推流,直接进行转码支持HLS:



主要的定位是做RTMP/HLS流媒体核心业务,支持RTMP集群(源站/边缘,或Forward方式),支持多进程和单进程,支持vhost,不支持点播。

因为简单,所以稳定;不是那个级别的稳定,是那个级别的稳定,非常稳定,没有出错的可能性。

尽管这样,也是基于state-threads的高性能服务器,一个进程一个线程,异步socket,能成为最好的流服务器。

而且,最方便的是逻辑简单,方便调试,没有异步的回调,全是同步函数。




RTMP协议收发包完整实现,510行就全部搞定。

确实很简单,发送RTMP包使用writev,一个函数就可以搞定:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2. * send out message with encoded payload to peer. 
  3. * use the message encode method to encode to payload, 
  4. * then sendout over socket. 
  5. * @msg this method will free it whatever return value. 
  6. */  
  7. virtual int send_message(SrsMessage* msg);  

实现如下:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. int SrsProtocol::send_message(SrsMessage* msg)  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {  
  6.         srs_error("encode packet to message payload failed. ret=%d", ret);  
  7.         return ret;  
  8.     }  
  9.     srs_info("encode packet to message payload success");  
  10.   
  11.     // p set to current write position,  
  12.     // it's ok when payload is NULL and size is 0.  
  13.     char* p = (char*)msg->payload;  
  14.       
  15.     // always write the header event payload is empty.  
  16.     do {  
  17.         // generate the header.  
  18.         char* pheader = NULL;  
  19.         int header_size = 0;  
  20.           
  21.         if (p == (char*)msg->payload) {  
  22.             // write new chunk stream header, fmt is 0  
  23.             pheader = out_header_fmt0;  
  24.             *pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F);  
  25.               
  26.             // chunk message header, 11 bytes  
  27.             // timestamp, 3bytes, big-endian  
  28.             if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) {  
  29.                 *pheader++ = 0xFF;  
  30.                 *pheader++ = 0xFF;  
  31.                 *pheader++ = 0xFF;  
  32.             } else {  
  33.                 pp = (char*)&msg->header.timestamp;   
  34.                 *pheader++ = pp[2];  
  35.                 *pheader++ = pp[1];  
  36.                 *pheader++ = pp[0];  
  37.             }  
  38.               
  39.             // message_length, 3bytes, big-endian  
  40.             pp = (char*)&msg->header.payload_length;  
  41.             *pheader++ = pp[2];  
  42.             *pheader++ = pp[1];  
  43.             *pheader++ = pp[0];  
  44.               
  45.             // message_type, 1bytes  
  46.             *pheader++ = msg->header.message_type;  
  47.               
  48.             // message_length, 3bytes, little-endian  
  49.             pp = (char*)&msg->header.stream_id;  
  50.             *pheader++ = pp[0];  
  51.             *pheader++ = pp[1];  
  52.             *pheader++ = pp[2];  
  53.             *pheader++ = pp[3];  
  54.               
  55.             // chunk extended timestamp header, 0 or 4 bytes, big-endian  
  56.             if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){  
  57.                 pp = (char*)&msg->header.timestamp;   
  58.                 *pheader++ = pp[3];  
  59.                 *pheader++ = pp[2];  
  60.                 *pheader++ = pp[1];  
  61.                 *pheader++ = pp[0];  
  62.             }  
  63.               
  64.             header_size = pheader - out_header_fmt0;  
  65.             pheader = out_header_fmt0;  
  66.         } else {  
  67.             // write no message header chunk stream, fmt is 3  
  68.             pheader = out_header_fmt3;  
  69.             *pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F);  
  70.               
  71.             // chunk extended timestamp header, 0 or 4 bytes, big-endian  
  72.             if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){  
  73.                 pp = (char*)&msg->header.timestamp;   
  74.                 *pheader++ = pp[3];  
  75.                 *pheader++ = pp[2];  
  76.                 *pheader++ = pp[1];  
  77.                 *pheader++ = pp[0];  
  78.             }  
  79.               
  80.             header_size = pheader - out_header_fmt3;  
  81.             pheader = out_header_fmt3;  
  82.         }  
  83.           
  84.         // sendout header and payload by writev.  
  85.         // decrease the sys invoke count to get higher performance.  
  86.         int payload_size = msg->size - (p - (char*)msg->payload);  
  87.         if (payload_size > out_chunk_size) {  
  88.             payload_size = out_chunk_size;  
  89.         }  
  90.           
  91.         // send by writev  
  92.         iovec iov[2];  
  93.         iov[0].iov_base = pheader;  
  94.         iov[0].iov_len = header_size;  
  95.         iov[1].iov_base = p;  
  96.         iov[1].iov_len = payload_size;  
  97.           
  98.         ssize_t nwrite;  
  99.         if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {  
  100.             srs_error("send with writev failed. ret=%d", ret);  
  101.             return ret;  
  102.         }  
  103.           
  104.         // consume sendout bytes when not empty packet.  
  105.         if (msg->payload && msg->size > 0) {  
  106.             p += payload_size;  
  107.         }  
  108.     } while (p < (char*)msg->payload + msg->size);  
  109.       
  110.     return ret;  
  111. }  


收RTMP包麻烦一点,需要五个函数(只有recv_message是public的):

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2. * recv a message with raw/undecoded payload from peer. 
  3. * the payload is not decoded, use srs_rtmp_expect_message<T> if requires  
  4. * specifies message. 
  5. * @pmsg, user must free it. NULL if not success. 
  6. * @remark, only when success, user can use and must free the pmsg. 
  7. */  
  8. virtual int recv_message(SrsMessage** pmsg);  
  9. /** 
  10. * try to recv interlaced message from peer, 
  11. * return error if error occur and nerver set the pmsg, 
  12. * return success and pmsg set to NULL if no entire message got, 
  13. * return success and pmsg set to entire message if got one. 
  14. */  
  15. virtual int recv_interlaced_message(SrsMessage** pmsg);  
  16. /** 
  17. * read the chunk basic header(fmt, cid) from chunk stream. 
  18. * user can discovery a SrsChunkStream by cid. 
  19. * @bh_size return the chunk basic header size, to remove the used bytes when finished. 
  20. */  
  21. virtual int read_basic_header(char& fmt, int& cid, int& bh_size);  
  22. /** 
  23. * read the chunk message header(timestamp, payload_length, message_type, stream_id)  
  24. * from chunk stream and save to SrsChunkStream. 
  25. * @mh_size return the chunk message header size, to remove the used bytes when finished. 
  26. */  
  27. virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);  
  28. /** 
  29. * read the chunk payload, remove the used bytes in buffer, 
  30. * if got entire message, set the pmsg. 
  31. * @payload_size read size in this roundtrip, generally a chunk size or left message size. 
  32. */  
  33. virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);  

实现如下:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. int SrsProtocol::recv_message(SrsMessage** pmsg)  
  2. {  
  3.     *pmsg = NULL;  
  4.       
  5.     int ret = ERROR_SUCCESS;  
  6.       
  7.     while (true) {  
  8.         SrsMessage* msg = NULL;  
  9.           
  10.         if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {  
  11.             srs_error("recv interlaced message failed. ret=%d", ret);  
  12.             return ret;  
  13.         }  
  14.         srs_verbose("entire msg received");  
  15.           
  16.         if (!msg) {  
  17.             continue;  
  18.         }  
  19.           
  20.         if (msg->size <= 0 || msg->header.payload_length <= 0) {  
  21.             srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",  
  22.                 msg->header.message_type, msg->header.payload_length,  
  23.                 msg->header.timestamp, msg->header.stream_id);  
  24.             delete msg;  
  25.             continue;  
  26.         }  
  27.           
  28.         srs_verbose("get a msg with raw/undecoded payload");  
  29.         *pmsg = msg;  
  30.         break;  
  31.     }  
  32.       
  33.     return ret;  
  34. }  
  35.   
  36. int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)  
  37. {  
  38.     int ret = ERROR_SUCCESS;  
  39.       
  40.     // chunk stream basic header.  
  41.     char fmt = 0;  
  42.     int cid = 0;  
  43.     int bh_size = 0;  
  44.     if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {  
  45.         srs_error("read basic header failed. ret=%d", ret);  
  46.         return ret;  
  47.     }  
  48.     srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);  
  49.       
  50.     // get the cached chunk stream.  
  51.     SrsChunkStream* chunk = NULL;  
  52.       
  53.     if (chunk_streams.find(cid) == chunk_streams.end()) {  
  54.         chunk = chunk_streams[cid] = new SrsChunkStream(cid);  
  55.         srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);  
  56.     } else {  
  57.         chunk = chunk_streams[cid];  
  58.         srs_info("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",  
  59.             chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,  
  60.             chunk->header.timestamp, chunk->header.stream_id);  
  61.     }  
  62.   
  63.     // chunk stream message header  
  64.     int mh_size = 0;  
  65.     if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {  
  66.         srs_error("read message header failed. ret=%d", ret);  
  67.         return ret;  
  68.     }  
  69.     srs_info("read message header success. "  
  70.             "fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",   
  71.             fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,   
  72.             chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);  
  73.       
  74.     // read msg payload from chunk stream.  
  75.     SrsMessage* msg = NULL;  
  76.     int payload_size = 0;  
  77.     if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {  
  78.         srs_error("read message payload failed. ret=%d", ret);  
  79.         return ret;  
  80.     }  
  81.       
  82.     // not got an entire RTMP message, try next chunk.  
  83.     if (!msg) {  
  84.         srs_info("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",  
  85.                 payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,  
  86.                 chunk->header.timestamp, chunk->header.stream_id);  
  87.         return ret;  
  88.     }  
  89.       
  90.     *pmsg = msg;  
  91.     srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",  
  92.             payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,  
  93.             chunk->header.timestamp, chunk->header.stream_id);  
  94.               
  95.     return ret;  
  96. }  
  97.   
  98. int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)  
  99. {  
  100.     int ret = ERROR_SUCCESS;  
  101.       
  102.     int required_size = 1;  
  103.     if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {  
  104.         srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);  
  105.         return ret;  
  106.     }  
  107.       
  108.     char* p = buffer->bytes();  
  109.       
  110.     fmt = (*p >> 6) & 0x03;  
  111.     cid = *p & 0x3f;  
  112.     bh_size = 1;  
  113.       
  114.     if (cid > 1) {  
  115.         srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);  
  116.         return ret;  
  117.     }  
  118.   
  119.     if (cid == 0) {  
  120.         required_size = 2;  
  121.         if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {  
  122.             srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);  
  123.             return ret;  
  124.         }  
  125.           
  126.         cid = 64;  
  127.         cid += *(++p);  
  128.         bh_size = 2;  
  129.         srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);  
  130.     } else if (cid == 1) {  
  131.         required_size = 3;  
  132.         if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {  
  133.             srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);  
  134.             return ret;  
  135.         }  
  136.           
  137.         cid = 64;  
  138.         cid += *(++p);  
  139.         cid += *(++p) * 256;  
  140.         bh_size = 3;  
  141.         srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);  
  142.     } else {  
  143.         srs_error("invalid path, impossible basic header.");  
  144.         srs_assert(false);  
  145.     }  
  146.       
  147.     return ret;  
  148. }  
  149.   
  150. int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size)  
  151. {  
  152.     int ret = ERROR_SUCCESS;  
  153.       
  154.     // when not exists cached msg, means get an new message,  
  155.     // the fmt must be type0 which means new message.  
  156.     if (!chunk->msg && fmt != RTMP_FMT_TYPE0) {  
  157.         ret = ERROR_RTMP_CHUNK_START;  
  158.         srs_error("chunk stream start, "  
  159.             "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);  
  160.         return ret;  
  161.     }  
  162.   
  163.     // when exists cache msg, means got an partial message,  
  164.     // the fmt must not be type0 which means new message.  
  165.     if (chunk->msg && fmt == RTMP_FMT_TYPE0) {  
  166.         ret = ERROR_RTMP_CHUNK_START;  
  167.         srs_error("chunk stream exists, "  
  168.             "fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);  
  169.         return ret;  
  170.     }  
  171.       
  172.     // create msg when new chunk stream start  
  173.     if (!chunk->msg) {  
  174.         srs_assert(fmt == RTMP_FMT_TYPE0);  
  175.         chunk->msg = new SrsMessage();  
  176.         srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);  
  177.     }  
  178.   
  179.     // read message header from socket to buffer.  
  180.     static char mh_sizes[] = {11, 7, 1, 0};  
  181.     mh_size = mh_sizes[(int)fmt];  
  182.     srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);  
  183.       
  184.     int required_size = bh_size + mh_size;  
  185.     if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {  
  186.         srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);  
  187.         return ret;  
  188.     }  
  189.     char* p = buffer->bytes() + bh_size;  
  190.       
  191.     // parse the message header.  
  192.     // see also: ngx_rtmp_recv  
  193.     if (fmt <= RTMP_FMT_TYPE2) {  
  194.         int32_t timestamp_delta;  
  195.         char* pp = (char*)×tamp_delta;  
  196.         pp[2] = *p++;  
  197.         pp[1] = *p++;  
  198.         pp[0] = *p++;  
  199.         pp[3] = 0;  
  200.           
  201.         if (fmt == RTMP_FMT_TYPE0) {  
  202.             // 6.1.2.1. Type 0  
  203.             // For a type-0 chunk, the absolute timestamp of the message is sent  
  204.             // here.  
  205.             chunk->header.timestamp = timestamp_delta;  
  206.         } else {  
  207.             // 6.1.2.2. Type 1  
  208.             // 6.1.2.3. Type 2  
  209.             // For a type-1 or type-2 chunk, the difference between the previous  
  210.             // chunk's timestamp and the current chunk's timestamp is sent here.  
  211.             chunk->header.timestamp += timestamp_delta;  
  212.         }  
  213.           
  214.         // fmt: 0  
  215.         // timestamp: 3 bytes  
  216.         // If the timestamp is greater than or equal to 16777215  
  217.         // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the  
  218.         // ‘extended timestamp header’ MUST be present. Otherwise, this value  
  219.         // SHOULD be the entire timestamp.  
  220.         //  
  221.         // fmt: 1 or 2  
  222.         // timestamp delta: 3 bytes  
  223.         // If the delta is greater than or equal to 16777215 (hexadecimal  
  224.         // 0x00ffffff), this value MUST be 16777215, and the ‘extended  
  225.         // timestamp header’ MUST be present. Otherwise, this value SHOULD be  
  226.         // the entire delta.  
  227.         chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);  
  228.         if (chunk->extended_timestamp) {  
  229.             chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;  
  230.         }  
  231.           
  232.         if (fmt <= RTMP_FMT_TYPE1) {  
  233.             pp = (char*)&chunk->header.payload_length;  
  234.             pp[2] = *p++;  
  235.             pp[1] = *p++;  
  236.             pp[0] = *p++;  
  237.             pp[3] = 0;  
  238.               
  239.             chunk->header.message_type = *p++;  
  240.               
  241.             if (fmt == 0) {  
  242.                 pp = (char*)&chunk->header.stream_id;  
  243.                 pp[0] = *p++;  
  244.                 pp[1] = *p++;  
  245.                 pp[2] = *p++;  
  246.                 pp[3] = *p++;  
  247.                 srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d",   
  248.                     fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,   
  249.                     chunk->header.message_type, chunk->header.stream_id);  
  250.             } else {  
  251.                 srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d",   
  252.                     fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,   
  253.                     chunk->header.message_type);  
  254.             }  
  255.         } else {  
  256.             srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d",   
  257.                 fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);  
  258.         }  
  259.     } else {  
  260.         srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",   
  261.             fmt, mh_size, chunk->extended_timestamp);  
  262.     }  
  263.       
  264.     if (chunk->extended_timestamp) {  
  265.         mh_size += 4;  
  266.         required_size = bh_size + mh_size;  
  267.         srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);  
  268.         if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {  
  269.             srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);  
  270.             return ret;  
  271.         }  
  272.   
  273.         char* pp = (char*)&chunk->header.timestamp;  
  274.         pp[3] = *p++;  
  275.         pp[2] = *p++;  
  276.         pp[1] = *p++;  
  277.         pp[0] = *p++;  
  278.         srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);  
  279.     }  
  280.       
  281.     // valid message  
  282.     if (chunk->header.payload_length < 0) {  
  283.         ret = ERROR_RTMP_MSG_INVLIAD_SIZE;  
  284.         srs_error("RTMP message size must not be negative. size=%d, ret=%d",   
  285.             chunk->header.payload_length, ret);  
  286.         return ret;  
  287.     }  
  288.       
  289.     // copy header to msg  
  290.     chunk->msg->header = chunk->header;  
  291.       
  292.     return ret;  
  293. }  
  294.   
  295. int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg)  
  296. {  
  297.     int ret = ERROR_SUCCESS;  
  298.       
  299.     // empty message  
  300.     if (chunk->header.payload_length == 0) {  
  301.         // need erase the header in buffer.  
  302.         buffer->erase(bh_size + mh_size);  
  303.           
  304.         srs_trace("get an empty RTMP "  
  305.                 "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,   
  306.                 chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);  
  307.           
  308.         *pmsg = chunk->msg;  
  309.         chunk->msg = NULL;  
  310.                   
  311.         return ret;  
  312.     }  
  313.     srs_assert(chunk->header.payload_length > 0);  
  314.       
  315.     // the chunk payload size.  
  316.     payload_size = chunk->header.payload_length - chunk->msg->size;  
  317.     if (payload_size > in_chunk_size) {  
  318.         payload_size = in_chunk_size;  
  319.     }  
  320.     srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",   
  321.         payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);  
  322.   
  323.     // create msg payload if not initialized  
  324.     if (!chunk->msg->payload) {  
  325.         chunk->msg->payload = new int8_t[chunk->header.payload_length];  
  326.         memset(chunk->msg->payload, 0, chunk->header.payload_length);  
  327.         srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);  
  328.     }  
  329.       
  330.     // read payload to buffer  
  331.     int required_size = bh_size + mh_size + payload_size;  
  332.     if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {  
  333.         srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);  
  334.         return ret;  
  335.     }  
  336.     memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);  
  337.     buffer->erase(bh_size + mh_size + payload_size);  
  338.     chunk->msg->size += payload_size;  
  339.       
  340.     srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);  
  341.       
  342.     // got entire RTMP message?  
  343.     if (chunk->header.payload_length == chunk->msg->size) {  
  344.         *pmsg = chunk->msg;  
  345.         chunk->msg = NULL;  
  346.         srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)",   
  347.                 chunk->header.message_type, chunk->header.payload_length,   
  348.                 chunk->header.timestamp, chunk->header.stream_id);  
  349.         return ret;  
  350.     }  
  351.       
  352.     srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d",   
  353.             chunk->header.message_type, chunk->header.payload_length,   
  354.             chunk->header.timestamp, chunk->header.stream_id,  
  355.             chunk->msg->size);  
  356.       
  357.     return ret;  
  358. }  

NGINX-RTMP对应的代码无数地方无数行,复杂得无与伦比。


http://blog.csdn.NET/win_lin/article/details/12844375

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多