Skip to content

Commit

Permalink
fix ossrs#250, support push MPEGTS over UDP to SRS. 2.0.111
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 31, 2015
1 parent 4246be9 commit 16afe7d
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 249 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ Supported operating systems and hardware:
).
1. Support HLS(h.264+mp3) streaming, read
[#301](https://github.com/winlinvip/simple-rtmp-server/issues/301).
1. [dev] Support push MPEG-TS over UDP to SRS, read
1. Support push MPEG-TS over UDP to SRS, read
[#250](https://github.com/winlinvip/simple-rtmp-server/issues/250).
1. [no-plan] Support <500ms latency, FRSC(Fast RTMP-compatible Stream Channel tech).
1. [no-plan] Support RTMP 302 redirect [#92](https://github.com/winlinvip/simple-rtmp-server/issues/92).
Expand Down Expand Up @@ -525,6 +525,7 @@ Supported operating systems and hardware:

### SRS 2.0 history

* v2.0, 2015-01-31, for [#250](https://github.com/winlinvip/simple-rtmp-server/issues/250), support push MPEGTS over UDP to SRS. 2.0.111
* v2.0, 2015-01-29, build libfdk-aac in ffmpeg. 2.0.108
* v2.0, 2015-01-25, for [#301](https://github.com/winlinvip/simple-rtmp-server/issues/301), hls support h.264+mp3, ok for vlc. 2.0.107
* v2.0, 2015-01-25, for [#301](https://github.com/winlinvip/simple-rtmp-server/issues/301), http ts stream support h.264+mp3. 2.0.106
Expand Down
188 changes: 137 additions & 51 deletions trunk/src/app/srs_app_mpegts_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,20 @@ int SrsMpegtsQueue::push(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;

if (msgs.find(msg->timestamp) != msgs.end()) {
srs_warn("mpegts: free the msg for dts exists, dts=%"PRId64, msg->timestamp);
srs_freep(msg);
return ret;
// TODO: FIXME: use right way.
for (int i = 0; i < 10; i++) {
if (msgs.find(msg->timestamp) == msgs.end()) {
break;
}

// adjust the ts, add 1ms.
msg->timestamp += 1;

if (i >= 5) {
srs_warn("mpegts: free the msg for dts exists, dts=%"PRId64, msg->timestamp);
srs_freep(msg);
return ret;
}
}

if (msg->is_audio()) {
Expand Down Expand Up @@ -114,6 +124,8 @@ SrsSharedPtrMessage* SrsMpegtsQueue::dequeue()
if (msg->is_video()) {
nb_videos--;
}

return msg;
}

return NULL;
Expand All @@ -131,6 +143,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
stfd = NULL;
stream_id = 0;
avc = new SrsRawH264Stream();
aac = new SrsRawAacStream();
h264_sps_changed = false;
h264_pps_changed = false;
h264_sps_pps_sent = false;
Expand All @@ -145,6 +158,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
srs_freep(stream);
srs_freep(context);
srs_freep(avc);
srs_freep(aac);
srs_freep(queue);
}

Expand Down Expand Up @@ -309,6 +323,9 @@ int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg)
if (msg->channel->stream == SrsTsStreamVideoH264) {
return on_ts_video(msg, &avs);
}
if (msg->channel->stream == SrsTsStreamAudioAAC) {
return on_ts_audio(msg, &avs);
}

// TODO: FIXME: implements it.
return ret;
Expand All @@ -326,6 +343,10 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
// ts tbn to flv tbn.
u_int32_t dts = msg->dts / 90;
u_int32_t pts = msg->dts / 90;

// the whole ts pes video packet must be a flv frame packet.
char* ibpframe = avs->data() + avs->pos();
int ibpframe_size = avs->size() - avs->pos();

// send each frame.
while (!avs->empty()) {
Expand All @@ -342,59 +363,50 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
continue;
}

// it may be return error, but we must process all packets.
if ((ret = write_h264_raw_frame(frame, frame_size, dts, pts)) != ERROR_SUCCESS) {
if (ret == ERROR_H264_DROP_BEFORE_SPS_PPS) {
// for sps
if (avc->is_sps(frame, frame_size)) {
std::string sps;
if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) {
return ret;
}

if (h264_sps == sps) {
continue;
}
return ret;
}

// for video, drop others with same pts/dts.
break;
}

return ret;
}

int SrsMpegtsOverUdp::write_h264_raw_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts)
{
int ret = ERROR_SUCCESS;

// for sps
if (avc->is_sps(frame, frame_size)) {
std::string sps;
if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) {
return ret;
}
h264_sps_changed = true;
h264_sps = sps;

if (h264_sps == sps) {
return ret;
if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
return ret;
}
continue;
}
h264_sps_changed = true;
h264_sps = sps;

return write_h264_sps_pps(dts, pts);
}

// for pps
if (avc->is_pps(frame, frame_size)) {
std::string pps;
if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) {
return ret;
}
// for pps
if (avc->is_pps(frame, frame_size)) {
std::string pps;
if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) {
return ret;
}

if (h264_pps == pps) {
return ret;
}
h264_pps_changed = true;
h264_pps = pps;
if (h264_pps == pps) {
continue;
}
h264_pps_changed = true;
h264_pps = pps;

return write_h264_sps_pps(dts, pts);
if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) {
return ret;
}
continue;
}

break;
}

// ibp frame.
return write_h264_ipb_frame(frame, frame_size, dts, pts);
srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", ibpframe_size, dts);
return write_h264_ipb_frame(ibpframe, ibpframe_size, dts, pts);
}

int SrsMpegtsOverUdp::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
Expand All @@ -421,14 +433,18 @@ int SrsMpegtsOverUdp::write_h264_sps_pps(u_int32_t dts, u_int32_t pts)
return ret;
}

// the timestamp in rtmp message header is dts.
u_int32_t timestamp = dts;
if ((ret = rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) {
return ret;
}

// reset sps and pps.
h264_sps_changed = false;
h264_pps_changed = false;
h264_sps_pps_sent = true;

// the timestamp in rtmp message header is dts.
u_int32_t timestamp = dts;
return rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv);

return ret;
}

int SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts)
Expand Down Expand Up @@ -459,6 +475,72 @@ int SrsMpegtsOverUdp::write_h264_ipb_frame(char* frame, int frame_size, u_int32_
return rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv);
}

int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsStream* avs)
{
int ret = ERROR_SUCCESS;

// ensure rtmp connected.
if ((ret = connect()) != ERROR_SUCCESS) {
return ret;
}

// ts tbn to flv tbn.
u_int32_t dts = msg->dts / 90;

// send each frame.
while (!avs->empty()) {
char* frame = NULL;
int frame_size = 0;
SrsRawAacStreamCodec codec;
if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) {
return ret;
}

// ignore invalid frame,
// * atleast 1bytes for aac to decode the data.
if (frame_size <= 0) {
continue;
}
srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts);

// generate sh.
if (aac_specific_config.empty()) {
std::string sh;
if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) {
return ret;
}
aac_specific_config = sh;

codec.aac_packet_type = 0;

if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) {
return ret;
}
}

// audio raw data.
codec.aac_packet_type = 1;
if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) {
return ret;
}
}

return ret;
}

int SrsMpegtsOverUdp::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts)
{
int ret = ERROR_SUCCESS;

char* data = NULL;
int size = 0;
if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) {
return ret;
}

return rtmp_write_packet(SrsCodecFlvTagAudio, dts, data, size);
}

int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size)
{
int ret = ERROR_SUCCESS;
Expand All @@ -482,6 +564,10 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
if ((msg = queue->dequeue()) == NULL) {
break;
}

// TODO: FIXME: use pithy print.
srs_info("mpegts: send msg %s dts=%"PRId64", size=%d",
msg->is_audio()? "A":msg->is_video()? "V":"N", msg->timestamp, msg->size);

// send out encoded msg.
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
Expand Down
10 changes: 9 additions & 1 deletion trunk/src/app/srs_app_mpegts_udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class SrsStSocket;
class SrsRequest;
class SrsRawH264Stream;
class SrsSharedPtrMessage;
class SrsRawAacStream;
class SrsRawAacStreamCodec;

#include <srs_app_st.hpp>
#include <srs_kernel_ts.hpp>
Expand Down Expand Up @@ -114,6 +116,10 @@ class SrsMpegtsOverUdp : virtual public ISrsTsHandler
std::string h264_pps;
bool h264_pps_changed;
bool h264_sps_pps_sent;
private:
SrsRawAacStream* aac;
std::string aac_specific_config;
private:
SrsMpegtsQueue* queue;
public:
SrsMpegtsOverUdp(SrsConfDirective* c);
Expand All @@ -126,9 +132,11 @@ class SrsMpegtsOverUdp : virtual public ISrsTsHandler
virtual int on_ts_message(SrsTsMessage* msg);
private:
virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs);
virtual int write_h264_raw_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts);
virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts);
virtual int write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts);
virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs);
virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts);
private:
virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size);
private:
// connect to rtmp output url.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 110
#define VERSION_REVISION 111

// server info.
#define RTMP_SIG_SRS_KEY "SRS"
Expand Down
Loading

0 comments on commit 16afe7d

Please sign in to comment.