公司在做编码器,需要和rtmp服务器对接,nginx-rtmp实在是太复杂,不方便调试,所以做了一个简单rtmp服务器。
参与这个项目的几个朋友,说不仅仅想实现流媒体源站功能,还想写出漂亮的代码,成为“国人软件代码之典范”。
哈哈,目标是远大的,代码得一点一点写。
https://github.com/winlinvip/simple-rtmp-server
一路输入,多路(实例是12路输出):
支持Flash推流,直接进行转码支持HLS:
主要的定位是做RTMP/HLS流媒体核心业务,支持RTMP集群(源站/边缘,或Forward方式),支持多进程和单进程,支持vhost,不支持点播。
因为简单,所以稳定;不是那个级别的稳定,是那个级别的稳定,非常稳定,没有出错的可能性。
尽管这样,也是基于state-threads的高性能服务器,一个进程一个线程,异步socket,能成为最好的流服务器。
而且,最方便的是逻辑简单,方便调试,没有异步的回调,全是同步函数。
RTMP协议收发包完整实现,510行就全部搞定。
确实很简单,发送RTMP包使用writev,一个函数就可以搞定:
-
/**
-
* send out message with encoded payload to peer.
-
* use the message encode method to encode to payload,
-
* then sendout over socket.
-
* @msg this method will free it whatever return value.
-
*/
-
virtual int send_message(SrsMessage* msg);
实现如下:
-
int SrsProtocol::send_message(SrsMessage* msg)
-
{
-
int ret = ERROR_SUCCESS;
-
-
if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {
-
srs_error("encode packet to message payload failed. ret=%d", ret);
-
return ret;
-
}
-
srs_info("encode packet to message payload success");
-
-
// p set to current write position,
-
// it's ok when payload is NULL and size is 0.
-
char* p = (char*)msg->payload;
-
-
// always write the header event payload is empty.
-
do {
-
// generate the header.
-
char* pheader = NULL;
-
int header_size = 0;
-
-
if (p == (char*)msg->payload) {
-
// write new chunk stream header, fmt is 0
-
pheader = out_header_fmt0;
-
*pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F);
-
-
// chunk message header, 11 bytes
-
// timestamp, 3bytes, big-endian
-
if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) {
-
*pheader++ = 0xFF;
-
*pheader++ = 0xFF;
-
*pheader++ = 0xFF;
-
} else {
-
pp = (char*)&msg->header.timestamp;
-
*pheader++ = pp[2];
-
*pheader++ = pp[1];
-
*pheader++ = pp[0];
-
}
-
-
// message_length, 3bytes, big-endian
-
pp = (char*)&msg->header.payload_length;
-
*pheader++ = pp[2];
-
*pheader++ = pp[1];
-
*pheader++ = pp[0];
-
-
// message_type, 1bytes
-
*pheader++ = msg->header.message_type;
-
-
// message_length, 3bytes, little-endian
-
pp = (char*)&msg->header.stream_id;
-
*pheader++ = pp[0];
-
*pheader++ = pp[1];
-
*pheader++ = pp[2];
-
*pheader++ = pp[3];
-
-
// chunk extended timestamp header, 0 or 4 bytes, big-endian
-
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
-
pp = (char*)&msg->header.timestamp;
-
*pheader++ = pp[3];
-
*pheader++ = pp[2];
-
*pheader++ = pp[1];
-
*pheader++ = pp[0];
-
}
-
-
header_size = pheader - out_header_fmt0;
-
pheader = out_header_fmt0;
-
} else {
-
// write no message header chunk stream, fmt is 3
-
pheader = out_header_fmt3;
-
*pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F);
-
-
// chunk extended timestamp header, 0 or 4 bytes, big-endian
-
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
-
pp = (char*)&msg->header.timestamp;
-
*pheader++ = pp[3];
-
*pheader++ = pp[2];
-
*pheader++ = pp[1];
-
*pheader++ = pp[0];
-
}
-
-
header_size = pheader - out_header_fmt3;
-
pheader = out_header_fmt3;
-
}
-
-
// sendout header and payload by writev.
-
// decrease the sys invoke count to get higher performance.
-
int payload_size = msg->size - (p - (char*)msg->payload);
-
if (payload_size > out_chunk_size) {
-
payload_size = out_chunk_size;
-
}
-
-
// send by writev
-
iovec iov[2];
-
iov[0].iov_base = pheader;
-
iov[0].iov_len = header_size;
-
iov[1].iov_base = p;
-
iov[1].iov_len = payload_size;
-
-
ssize_t nwrite;
-
if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {
-
srs_error("send with writev failed. ret=%d", ret);
-
return ret;
-
}
-
-
// consume sendout bytes when not empty packet.
-
if (msg->payload && msg->size > 0) {
-
p += payload_size;
-
}
-
} while (p < (char*)msg->payload + msg->size);
-
-
return ret;
-
}
收RTMP包麻烦一点,需要五个函数(只有recv_message是public的):
-
/**
-
* recv a message with raw/undecoded payload from peer.
-
* the payload is not decoded, use srs_rtmp_expect_message<T> if requires
-
* specifies message.
-
* @pmsg, user must free it. NULL if not success.
-
* @remark, only when success, user can use and must free the pmsg.
-
*/
-
virtual int recv_message(SrsMessage** pmsg);
-
/**
-
* try to recv interlaced message from peer,
-
* return error if error occur and nerver set the pmsg,
-
* return success and pmsg set to NULL if no entire message got,
-
* return success and pmsg set to entire message if got one.
-
*/
-
virtual int recv_interlaced_message(SrsMessage** pmsg);
-
/**
-
* read the chunk basic header(fmt, cid) from chunk stream.
-
* user can discovery a SrsChunkStream by cid.
-
* @bh_size return the chunk basic header size, to remove the used bytes when finished.
-
*/
-
virtual int read_basic_header(char& fmt, int& cid, int& bh_size);
-
/**
-
* read the chunk message header(timestamp, payload_length, message_type, stream_id)
-
* from chunk stream and save to SrsChunkStream.
-
* @mh_size return the chunk message header size, to remove the used bytes when finished.
-
*/
-
virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
-
/**
-
* read the chunk payload, remove the used bytes in buffer,
-
* if got entire message, set the pmsg.
-
* @payload_size read size in this roundtrip, generally a chunk size or left message size.
-
*/
-
virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
实现如下:
-
int SrsProtocol::recv_message(SrsMessage** pmsg)
-
{
-
*pmsg = NULL;
-
-
int ret = ERROR_SUCCESS;
-
-
while (true) {
-
SrsMessage* msg = NULL;
-
-
if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
-
srs_error("recv interlaced message failed. ret=%d", ret);
-
return ret;
-
}
-
srs_verbose("entire msg received");
-
-
if (!msg) {
-
continue;
-
}
-
-
if (msg->size <= 0 || msg->header.payload_length <= 0) {
-
srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",
-
msg->header.message_type, msg->header.payload_length,
-
msg->header.timestamp, msg->header.stream_id);
-
delete msg;
-
continue;
-
}
-
-
srs_verbose("get a msg with raw/undecoded payload");
-
*pmsg = msg;
-
break;
-
}
-
-
return ret;
-
}
-
-
int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
-
{
-
int ret = ERROR_SUCCESS;
-
-
// chunk stream basic header.
-
char fmt = 0;
-
int cid = 0;
-
int bh_size = 0;
-
if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {
-
srs_error("read basic header failed. ret=%d", ret);
-
return ret;
-
}
-
srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);
-
-
// get the cached chunk stream.
-
SrsChunkStream* chunk = NULL;
-
-
if (chunk_streams.find(cid) == chunk_streams.end()) {
-
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
-
srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
-
} else {
-
chunk = chunk_streams[cid];
-
srs_info("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
-
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
-
chunk->header.timestamp, chunk->header.stream_id);
-
}
-
-
// chunk stream message header
-
int mh_size = 0;
-
if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {
-
srs_error("read message header failed. ret=%d", ret);
-
return ret;
-
}
-
srs_info("read message header success. "
-
"fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
-
fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,
-
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
-
-
// read msg payload from chunk stream.
-
SrsMessage* msg = NULL;
-
int payload_size = 0;
-
if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
-
srs_error("read message payload failed. ret=%d", ret);
-
return ret;
-
}
-
-
// not got an entire RTMP message, try next chunk.
-
if (!msg) {
-
srs_info("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
-
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
-
chunk->header.timestamp, chunk->header.stream_id);
-
return ret;
-
}
-
-
*pmsg = msg;
-
srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
-
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
-
chunk->header.timestamp, chunk->header.stream_id);
-
-
return ret;
-
}
-
-
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
-
{
-
int ret = ERROR_SUCCESS;
-
-
int required_size = 1;
-
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
-
srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
-
return ret;
-
}
-
-
char* p = buffer->bytes();
-
-
fmt = (*p >> 6) & 0x03;
-
cid = *p & 0x3f;
-
bh_size = 1;
-
-
if (cid > 1) {
-
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
-
return ret;
-
}
-
-
if (cid == 0) {
-
required_size = 2;
-
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
-
srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
-
return ret;
-
}
-
-
cid = 64;
-
cid += *(++p);
-
bh_size = 2;
-
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
-
} else if (cid == 1) {
-
required_size = 3;
-
if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
-
srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
-
return ret;
-
}
-
-
cid = 64;
-
cid += *(++p);
-
cid += *(++p) * 256;
-
bh_size = 3;
-
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
-
} else {
-
srs_error("invalid path, impossible basic header.");
-
srs_assert(false);
-
}
-
-
return ret;
-
}
-
-
int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size)
-
{
-
int ret = ERROR_SUCCESS;
-
-
// when not exists cached msg, means get an new message,
-
// the fmt must be type0 which means new message.
-
if (!chunk->msg && fmt != RTMP_FMT_TYPE0) {
-
ret = ERROR_RTMP_CHUNK_START;
-
srs_error("chunk stream start, "
-
"fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
-
return ret;
-
}
-
-
// when exists cache msg, means got an partial message,
-
// the fmt must not be type0 which means new message.
-
if (chunk->msg && fmt == RTMP_FMT_TYPE0) {
-
ret = ERROR_RTMP_CHUNK_START;
-
srs_error("chunk stream exists, "
-
"fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
-
return ret;
-
}
-
-
// create msg when new chunk stream start
-
if (!chunk->msg) {
-
srs_assert(fmt == RTMP_FMT_TYPE0);
-
chunk->msg = new SrsMessage();
-
srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
-
}
-
-
// read message header from socket to buffer.
-
static char mh_sizes[] = {11, 7, 1, 0};
-
mh_size = mh_sizes[(int)fmt];
-
srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
-
-
int required_size = bh_size + mh_size;
-
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
-
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
-
return ret;
-
}
-
char* p = buffer->bytes() + bh_size;
-
-
// parse the message header.
-
// see also: ngx_rtmp_recv
-
if (fmt <= RTMP_FMT_TYPE2) {
-
int32_t timestamp_delta;
-
char* pp = (char*)×tamp_delta;
-
pp[2] = *p++;
-
pp[1] = *p++;
-
pp[0] = *p++;
-
pp[3] = 0;
-
-
if (fmt == RTMP_FMT_TYPE0) {
-
// 6.1.2.1. Type 0
-
// For a type-0 chunk, the absolute timestamp of the message is sent
-
// here.
-
chunk->header.timestamp = timestamp_delta;
-
} else {
-
// 6.1.2.2. Type 1
-
// 6.1.2.3. Type 2
-
// For a type-1 or type-2 chunk, the difference between the previous
-
// chunk's timestamp and the current chunk's timestamp is sent here.
-
chunk->header.timestamp += timestamp_delta;
-
}
-
-
// fmt: 0
-
// timestamp: 3 bytes
-
// If the timestamp is greater than or equal to 16777215
-
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
-
// ‘extended timestamp header’ MUST be present. Otherwise, this value
-
// SHOULD be the entire timestamp.
-
//
-
// fmt: 1 or 2
-
// timestamp delta: 3 bytes
-
// If the delta is greater than or equal to 16777215 (hexadecimal
-
// 0x00ffffff), this value MUST be 16777215, and the ‘extended
-
// timestamp header’ MUST be present. Otherwise, this value SHOULD be
-
// the entire delta.
-
chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
-
if (chunk->extended_timestamp) {
-
chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
-
}
-
-
if (fmt <= RTMP_FMT_TYPE1) {
-
pp = (char*)&chunk->header.payload_length;
-
pp[2] = *p++;
-
pp[1] = *p++;
-
pp[0] = *p++;
-
pp[3] = 0;
-
-
chunk->header.message_type = *p++;
-
-
if (fmt == 0) {
-
pp = (char*)&chunk->header.stream_id;
-
pp[0] = *p++;
-
pp[1] = *p++;
-
pp[2] = *p++;
-
pp[3] = *p++;
-
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d",
-
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
-
chunk->header.message_type, chunk->header.stream_id);
-
} else {
-
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d",
-
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
-
chunk->header.message_type);
-
}
-
} else {
-
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d",
-
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
-
}
-
} else {
-
srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",
-
fmt, mh_size, chunk->extended_timestamp);
-
}
-
-
if (chunk->extended_timestamp) {
-
mh_size += 4;
-
required_size = bh_size + mh_size;
-
srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
-
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
-
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
-
return ret;
-
}
-
-
char* pp = (char*)&chunk->header.timestamp;
-
pp[3] = *p++;
-
pp[2] = *p++;
-
pp[1] = *p++;
-
pp[0] = *p++;
-
srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);
-
}
-
-
// valid message
-
if (chunk->header.payload_length < 0) {
-
ret = ERROR_RTMP_MSG_INVLIAD_SIZE;
-
srs_error("RTMP message size must not be negative. size=%d, ret=%d",
-
chunk->header.payload_length, ret);
-
return ret;
-
}
-
-
// copy header to msg
-
chunk->msg->header = chunk->header;
-
-
return ret;
-
}
-
-
int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg)
-
{
-
int ret = ERROR_SUCCESS;
-
-
// empty message
-
if (chunk->header.payload_length == 0) {
-
// need erase the header in buffer.
-
buffer->erase(bh_size + mh_size);
-
-
srs_trace("get an empty RTMP "
-
"message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,
-
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
-
-
*pmsg = chunk->msg;
-
chunk->msg = NULL;
-
-
return ret;
-
}
-
srs_assert(chunk->header.payload_length > 0);
-
-
// the chunk payload size.
-
payload_size = chunk->header.payload_length - chunk->msg->size;
-
if (payload_size > in_chunk_size) {
-
payload_size = in_chunk_size;
-
}
-
srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
-
payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
-
-
// create msg payload if not initialized
-
if (!chunk->msg->payload) {
-
chunk->msg->payload = new int8_t[chunk->header.payload_length];
-
memset(chunk->msg->payload, 0, chunk->header.payload_length);
-
srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);
-
}
-
-
// read payload to buffer
-
int required_size = bh_size + mh_size + payload_size;
-
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
-
srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
-
return ret;
-
}
-
memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
-
buffer->erase(bh_size + mh_size + payload_size);
-
chunk->msg->size += payload_size;
-
-
srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
-
-
// got entire RTMP message?
-
if (chunk->header.payload_length == chunk->msg->size) {
-
*pmsg = chunk->msg;
-
chunk->msg = NULL;
-
srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)",
-
chunk->header.message_type, chunk->header.payload_length,
-
chunk->header.timestamp, chunk->header.stream_id);
-
return ret;
-
}
-
-
srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d",
-
chunk->header.message_type, chunk->header.payload_length,
-
chunk->header.timestamp, chunk->header.stream_id,
-
chunk->msg->size);
-
-
return ret;
-
}
NGINX-RTMP对应的代码无数地方无数行,复杂得无与伦比。
http://blog.csdn.NET/win_lin/article/details/12844375
|