如何从SRS服务器拉HTTP-FLV流

http-flv技术的实现

HTTP协议中有个约定:content-length字段,http的body部分的⻓度服务器回复http请求的时候如果有这个字段,客户端就接收这个⻓度的数据然后就认为数据传输完成了,

如果服务器回复http请求中没有这个字段,客户端就⼀直接收数据,直到服务器跟客户端的socket连接断开。

http-flv直播就是利⽤了这个原理,服务器回复客户端请求的时候不加content-length字段,在回复了http内容之后,紧接着发送flv数据,客户端就⼀直接收数据了。

请求SRS返回的是:

HTTP/1.1 200 OK
Connection: Keep-Alive
Content-Type: video/x-flv
Server: SRS/3.0.141(OuXuli)
Transfer-Encoding: chunked

注:wiresharek过滤条件为:http or tcp.port==8081。

srs配置http和http-flv服务

主要分为两部分:

  1. 配置http 服务
  2. 配置http-flv服务

配置⽂件如下所示:

listen              1935;
max_connections     1000;
#srs_log_tank        file;
#srs_log_file        ./objs/srs.log;
daemon              off;
srs_log_tank        console;
http_api {
    enabled         on;
    listen          1985;
}
http_server {
    enabled         on;
    listen          8081; #http监听端口,配置的是http服务器,如果是云服务器一定要开放对应端口
    dir             ./objs/nginx/html;
}
stats {
    network         0;
    disk            sda sdb xvda xvdb;
}
vhost __defaultVhost__ {
    hls {
        enabled         on;
        hls_path ./objs/nginx/html;
        hls_fragment 10;
        hls_window 60;
    }
    http_remux {
        enabled     on;
        mount       [vhost]/[app]/[stream].flv;
        hstrs       on;
    }
}
如何从SRS服务器拉HTTP-FLV流

验证配置是否成功

在客户端进⾏推流验证:

ffmpeg -re -i source.200kbps.768x320.flv -vcodec copy -acodec copy -f flv -y
rtmp://8.141.75.248/live/livestream

在客户端进行拉流验证:

ffplay http://8.141.75.248:8081/live/livestream.flv
ffplay rtmp://8.141.75.248/live/livestream

http_remux配置说明

会根据mount的值,如[vhost]/[app]/[stream].flv,判断什么样的播放类型。

SrsLiveEntry::SrsLiveEntry(std::string m)
{
    mount = m;
    
    stream = NULL;
    cache = NULL;
    
    req = NULL;
    source = NULL;
    
    std::string ext = srs_path_filext(m); //根据mount获取后缀,如[vhost]/[app]/[stream].flv
    _is_flv = (ext == ".flv");
    _is_ts = (ext == ".ts");
    _is_mp3 = (ext == ".mp3");
    _is_aac = (ext == ".aac");
}

http不仅支持FLV的拉流,还支持TS,AAC,MP3类型的拉流,以下为http_remux的配置。

http_remux {
	enabled on;
	mount [vhost]/[app]/[stream].flv; # ⽀持.flv .ts .aac .mp3的使⽤
	hstrs on;
}

vhost配置说明

vhost作为应用配置的单元,能隔离客户,应用不同的配置。

Vhost的主要应用场景包括:

  1. 一个分发网络支持多个客户:譬如CDN,一个分发网络中,有N个客户公用一套流媒体系统,如何区分用户,计费,监控等等?通过app么?大家可能都叫做live之类。最好是通过各自的域名。
  2. 不同的应用配置:譬如FMLE推上来的流是h264+mp3,可以将音频转码后放到其他的vhost分发hls,这样接入h264+mp3的vhost就不用切hls。

参考wiki:https://github.com/ossrs/srs/wiki/v3_CN_RtmpUrlVhost

配置文件调用栈

#0 SrsHttpStreamServer::initialize_flv_entry (this=0xa11fd0, vhost="__defaultVhost__")
at src/app/srs_app_http_stream.cpp:1163
#1 0x00000000005028d3 in SrsHttpStreamServer::initialize_flv_streaming (this=0xa11fd0)
at src/app/srs_app_http_stream.cpp:1154
#2 0x0000000000500a2a in SrsHttpStreamServer::initialize (this=0xa11fd0) at
src/app/srs_app_http_stream.cpp:873
#3 0x0000000000561eb7 in SrsHttpServer::initialize (this=0xa11e00) at
src/app/srs_app_http_conn.cpp:279
#4 0x00000000004c84c0 in SrsServer::initialize (this=0xa11ea0, ch=0x0) at
src/app/srs_app_server.cpp:757
#5 0x00000000005bcb57 in run (svr=0xa11ea0) at src/main/srs_main_server.cpp:395
#6 0x00000000005bb769 in do_main (argc=3, argv=0x7fffffffe4f8) at
src/main/srs_main_server.cpp:184
#7 0x00000000005bb8ad in main (argc=3, argv=0x7fffffffe4f8) at
src/main/srs_main_server.cpp:192

基本信息

SrsLiveStream::do_serve_http 处理客户端的数据发送。

每个http client连接对应⼀个SrsHttpConn,和SrsRtmpConn连接类似。

每个SrsHttpConn也会对应⼀个消费者SrsConsumer,即是SrsConsumer对应rtmp、http-flv都是通⽤的,作为中间数据的缓存。

相关类说明

  1. SrsBufferCache:HTTP直播流编码器的缓存
  2. SrsFlvStreamEncoder:将RTMP转成HTTP FLV流
  3. SrsTsStreamEncoder:将RTMP转成HTTP TS流
  4. SrsAacStreamEncoder:将RTMP含有的AAC成分转成HTTP AAC流
  5. SrsMp3StreamEncoder:将RTMP含有的MP3成分转成HTTP MP3流
  6. SrsBufferWriter:将流直接写⼊到HTTP响应
  7. SrsLiveStream:HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler
  8. SrsLiveEntry:直播⼊⼝,⽤来处理HTTP 直播流
  9. SrsHttpStreamServer:HTTP直播流服务,服务FLV/TS/MP3/AAC流
  10. SrsHttpResponseWriter:负责将数据发送给客户端,本质是调⽤SrsStSocket进⾏发送
  11. SrsHttpServeMux HTTP请求多路复⽤器,⾥⾯记录了path以及对应的handler

RTMP推流

推流的时候根据url创建对应的handler,拉流的时候根据url找到对应处理的handler。

rtmp推流调用stack:

(gdb) bt
#0  SrsLiveStream::SrsLiveStream (this=0xb159f0, s=0xadd3c0, r=0xadde30, c=0xaded50) at src/app/srs_app_http_stream.cpp:514
#1  0x0000000000501f3b in SrsHttpStreamServer::http_mount (this=0xa11db0, s=0xadd3c0, r=0xadde30) at src/app/srs_app_http_stream.cpp:912
#2  0x000000000056358d in SrsHttpServer::http_mount (this=0xa12220, s=0xadd3c0, r=0xadde30) at src/app/srs_app_http_conn.cpp:308
#3  0x00000000004ce06a in SrsServer::on_publish (this=0xa10370, s=0xadd3c0, r=0xadde30) at src/app/srs_app_server.cpp:1610
#4  0x00000000004e775e in SrsSource::on_publish (this=0xadd3c0) at src/app/srs_app_source.cpp:2463
#5  0x00000000004d96ca in SrsRtmpConn::acquire_publish (this=0xac0f10, source=0xadd3c0) at src/app/srs_app_rtmp_conn.cpp:940
#6  0x00000000004d874c in SrsRtmpConn::publishing (this=0xac0f10, source=0xadd3c0) at src/app/srs_app_rtmp_conn.cpp:822
#7  0x00000000004d5ee7 in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:534
#8  0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#9  0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#10 0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#11 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#12 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#13 0x00000000005bed1a in _st_thread_main () at sched.c:337
#14 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

对应代码在SrsHttpStreamServer::http_mount:

srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
{
    srs_error_t err = srs_success;
    
    // the id to identify stream.
    std::string sid = r->get_stream_url(); // 比如rtmp://8.141.75.248:1935/live/livestream中streamUrl就是/live/stream
    SrsLiveEntry* entry = NULL; //SrsLiveEntry,直播⼊⼝,⽤来处理HTTP直播流
    
    // create stream from template when not found.
    if (sflvs.find(sid) == sflvs.end()) { //找不到对于的sid
        if (tflvs.find(r->vhost) == tflvs.end()) { //查找对应的vhost,找不到则返回
            return err;
        }
        
        SrsLiveEntry* tmpl = tflvs[r->vhost];
        
        std::string mount = tmpl->mount; //配置中的mount是[vhost]/[app]/[stream].flv
        
        // replace the vhost variable  替换mount,由[vhost]/[app]/[stream].flv变为__defaultVhost__/live/livestream.flv
        mount = srs_string_replace(mount, "[vhost]", r->vhost);
        mount = srs_string_replace(mount, "[app]", r->app);
        mount = srs_string_replace(mount, "[stream]", r->stream);
        
        // remove the default vhost mount 由__defaultVhost__/live/livestream.flv替换为/live/livestream.flv
        mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
        
        entry = new SrsLiveEntry(mount); //创建SrsLiveEntry并标明类型,比如flv还是ts

        entry->source = s; //指向source
        entry->req = r->copy()->as_http();
        entry->cache = new SrsBufferCache(s, r);
        entry->stream = new SrsLiveStream(s, r, entry->cache); //创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler
        
        // TODO: FIXME: maybe refine the logic of http remux service.
        // if user push streams followed:
        //     rtmp://test.com/live/stream1
        //     rtmp://test.com/live/stream2
        // and they will using the same template, such as: [vhost]/[app]/[stream].flv
        // so, need to free last request object, otherwise, it will cause memory leak.
        srs_freep(tmpl->req);
        
        tmpl->source = s;
        tmpl->req = r->copy()->as_http();
        
        sflvs[sid] = entry; //每个sid对应一个entry,entry包含SrsLiveStream
        
        // mount the http flv stream.
        // we must register the handler, then start the thread,  根据给的mount挂载handler (SrsLiveStream)
        // for the thread will cause thread switch context.
        // @see https://github.com/ossrs/srs/issues/404 创建路由
        if ((err = mux.handle(mount, entry->stream)) != srs_success) { //挂载http-flv流
            return srs_error_wrap(err, "http: mount flv stream for vhost=%s failed", sid.c_str());
        }
        
        // start http stream cache thread 开启http stream 缓存线程
        if ((err = entry->cache->start()) != srs_success) {
            return srs_error_wrap(err, "http: start stream cache failed");
        }
        srs_trace("http: mount flv stream for sid=%s, mount=%s", sid.c_str(), mount.c_str());
    } else {
        // The entry exists, we reuse it and update the request of stream and cache.
        entry = sflvs[sid];
        entry->stream->update_auth(s, r);
        entry->cache->update_auth(s, r);
    }
    
    if (entry->stream) {
        entry->stream->entry->enabled = true;
        return err;
    }
    
    return err;
}

主要过程有:

  1. 创建SrsLiveEntry并标明类型,比如flv还是ts。
  2. 创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式。
  3. 根据给的mount挂载handler (SrsLiveStream)。

创建SrsLiveEntry并标明类型,比如flv还是ts

见SrsLiveEntry::SrsLiveEntry函数:

SrsLiveEntry::SrsLiveEntry(std::string m)
{
    mount = m;
    
    stream = NULL;
    cache = NULL;
    
    req = NULL;
    source = NULL;
    
    std::string ext = srs_path_filext(m); //判断什么类型
    _is_flv = (ext == ".flv");
    _is_ts = (ext == ".ts");
    _is_mp3 = (ext == ".mp3");
    _is_aac = (ext == ".aac");
}

创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式

见SrsLiveStream::SrsLiveStream函数:

SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c)
{
    source = s;
    cache = c;
    req = r->copy()->as_http();
}

根据给的mount挂载handler (SrsLiveStream)

见函数SrsHttpServeMux::handle:

srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handler) //handle是SrsLiveStream对象(每个source都会绑定一个)
{   // pattern:/live/livestream.flv
    srs_assert(handler);
    
    if (pattern.empty()) {
        return srs_error_new(ERROR_HTTP_PATTERN_EMPTY, "empty pattern");
    }
    
    if (entries.find(pattern) != entries.end()) {
        SrsHttpMuxEntry* exists = entries[pattern];
        if (exists->explicit_match) {
            return srs_error_new(ERROR_HTTP_PATTERN_DUPLICATED, "pattern=%s exists", pattern.c_str());
        }
    }
    
    std::string vhost = pattern;
    if (pattern.at(0) != '/') {
        if (pattern.find("/") != string::npos) {
            vhost = pattern.substr(0, pattern.find("/"));
        }
        vhosts[vhost] = handler;
    }
    
    if (true) {
        SrsHttpMuxEntry* entry = new SrsHttpMuxEntry(); //创建SrsHttpMuxEntry,服务器多路复用器的多路复用条目,匹配器信息包含,例如模式和处理程序。
        entry->explicit_match = true;
        entry->handler = handler;   //SrsLiveStream
        entry->pattern = pattern;   //pattern:/live/livestream.flv
        entry->handler->entry = entry;
        
        if (entries.find(pattern) != entries.end()) {
            SrsHttpMuxEntry* exists = entries[pattern];
            srs_freep(exists);
        }
        entries[pattern] = entry; //加入entries的map中,key:pattern,value:entry,entry包含了handle,即SrsLiveStream
    }
    
    // Helpful behavior:
    // If pattern is /tree/, insert an implicit permanent redirect for /tree.
    // It can be overridden by an explicit registration.
    if (pattern != "/" && !pattern.empty() && pattern.at(pattern.length() - 1) == '/') { //没进来
        std::string rpattern = pattern.substr(0, pattern.length() - 1);
        SrsHttpMuxEntry* entry = NULL;
        
        // free the exists implicit entry
        if (entries.find(rpattern) != entries.end()) {
            entry = entries[rpattern];
        }
        
        // create implicit redirect.
        if (!entry || !entry->explicit_match) {
            srs_freep(entry);
            
            entry = new SrsHttpMuxEntry();
            entry->explicit_match = false;
            entry->handler = new SrsHttpRedirectHandler(pattern, SRS_CONSTS_HTTP_Found);
            entry->pattern = pattern;
            entry->handler->entry = entry;
            
            entries[rpattern] = entry;
        }
    }
    
    return srs_success;
}

接收客户端推流message是在 SrsRtmpConn::process_publish_message 里负责接收音视频数据并存入。

如何从SRS服务器拉HTTP-FLV流

Http-Flv拉流

当拉流客户端请求HTTP-FLV流时,会带URI。如wiresharek截图:

如何从SRS服务器拉HTTP-FLV流

主要入口代码SrsHttpServeMux::serve_http:

srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    srs_error_t err = srs_success;
    
    ISrsHttpHandler* h = NULL;
    if ((err = find_handler(r, &h)) != srs_success) { //根据uri找到对应的handle
        return srs_error_wrap(err, "find handler");
    }
    
    srs_assert(h);
    if ((err = h->serve_http(w, r)) != srs_success) { //SrsLiveStream::serve_http 
        return srs_error_wrap(err, "serve http");
    }
    
    return err;
}

调用栈:

(gdb) bt
#0  SrsLiveStream::do_serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:552
#1  0x00000000004fef4a in SrsLiveStream::serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:544
#2  0x000000000049d2af in SrsHttpServeMux::serve_http (this=0xa11dc0, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:711
#3  0x0000000000563518 in SrsHttpServer::serve_http (this=0xa12220, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:300
#4  0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb4ca40, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:859
#5  0x000000000056251e in SrsHttpConn::process_request (this=0xb57740, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:161
#6  0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb57740) at src/app/srs_app_http_conn.cpp:133
#7  0x00000000004d1d99 in SrsConnection::cycle (this=0xb57740) at src/app/srs_app_conn.cpp:171
#8  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb577e0) at src/app/srs_app_st.cpp:198
#9  0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb577e0) at src/app/srs_app_st.cpp:213
#10 0x00000000005bed1a in _st_thread_main () at sched.c:337
#11 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

SRS主要过程有:

  1. SRS会根据URI去匹配,找到对应的handle(SrsLiveStream)
  2. 通过handle发送HTTP-FLV流到客户端。

SRS会根据URI去匹配,找到对应的handle(SrsLiveStream)

见函数:SrsHttpServeMux::find_handler:

srs_error_t SrsHttpServeMux::find_handler(ISrsHttpMessage* r, ISrsHttpHandler** ph)
{
    srs_error_t err = srs_success;
    
    // TODO: FIXME: support the path . and ..
    if (r->url().find("..") != std::string::npos) {
        return srs_error_new(ERROR_HTTP_URL_NOT_CLEAN, "url %s not canonical", r->url().c_str());
    }
    
    if ((err = match(r, ph)) != srs_success) { //通过uri匹配对应的handle(SrsLiveStream)
        return srs_error_wrap(err, "http match");
    }
    
    // always hijack.
    if (!hijackers.empty()) {
        // notify all hijackers unless matching failed.
        std::vector<ISrsHttpMatchHijacker*>::iterator it;
        for (it = hijackers.begin(); it != hijackers.end(); ++it) {
            ISrsHttpMatchHijacker* hijacker = *it;
            if ((err = hijacker->hijack(r, ph)) != srs_success) { //进入SrsHttpStreamServer::hijack
                return srs_error_wrap(err, "http hijack");
            }
        }
    }
    
    static ISrsHttpHandler* h404 = new SrsHttpNotFoundHandler();
    if (*ph == NULL) {
        *ph = h404;
    }
    
    return err;
}

通过uri匹配对应的handle(SrsLiveStream),见函数SrsHttpServeMux::match:

srs_error_t SrsHttpServeMux::match(ISrsHttpMessage* r, ISrsHttpHandler** ph)
{
    std::string path = r->path(); //http请求带过来的uri:/live/livestream.flv
    
    // Host-specific pattern takes precedence over generic ones
    if (!vhosts.empty() && vhosts.find(r->host()) != vhosts.end()) {
        path = r->host() + path;
    }
    
    int nb_matched = 0;
    ISrsHttpHandler* h = NULL;
    
    std::map<std::string, SrsHttpMuxEntry*>::iterator it;
    for (it = entries.begin(); it != entries.end(); ++it) { //遍历获取pattern和entry
        std::string pattern = it->first;
        SrsHttpMuxEntry* entry = it->second;
        
        if (!entry->enabled) {
            continue;
        }
        
        if (!path_match(pattern, path)) {
            continue;
        }
        
        if (!h || (int)pattern.length() > nb_matched) { //匹配成功返回handle
            nb_matched = (int)pattern.length();
            h = entry->handler;
        }
    }
    
    *ph = h;
    
    return srs_success;
}

调用栈如下:

(gdb) bt
#0  SrsHttpServeMux::match (this=0xa11dc0, r=0xb634a0, ph=0xb61b68) at src/protocol/srs_http_stack.cpp:766
#1  0x000000000049d40a in SrsHttpServeMux::find_handler (this=0xa11dc0, r=0xb634a0, ph=0xb61b68) at src/protocol/srs_http_stack.cpp:727
#2  0x000000000056349f in SrsHttpServer::serve_http (this=0xa12220, w=0xb61d90, r=0xb634a0) at src/app/srs_app_http_conn.cpp:296
#3  0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb47b40, w=0xb61d90, r=0xb634a0) at src/protocol/srs_http_stack.cpp:859
#4  0x000000000056251e in SrsHttpConn::process_request (this=0xb50d50, w=0xb61d90, r=0xb634a0) at src/app/srs_app_http_conn.cpp:161
#5  0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb50d50) at src/app/srs_app_http_conn.cpp:133
#6  0x00000000004d1d99 in SrsConnection::cycle (this=0xb50d50) at src/app/srs_app_conn.cpp:171
#7  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb50c20) at src/app/srs_app_st.cpp:198
#8  0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb50c20) at src/app/srs_app_st.cpp:213
#9  0x00000000005bed1a in _st_thread_main () at sched.c:337
#10 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

客户端拉取HTTP-FLV

函数见:SrsLiveStream::do_serve_http

调用栈:

(gdb) bt
#0  SrsLiveStream::do_serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:620
#1  0x00000000004fef4a in SrsLiveStream::serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:544
#2  0x000000000049d2af in SrsHttpServeMux::serve_http (this=0xa11dc0, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:711
#3  0x0000000000563518 in SrsHttpServer::serve_http (this=0xa12220, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:300
#4  0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb4ca40, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:859
#5  0x000000000056251e in SrsHttpConn::process_request (this=0xb57740, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:161
#6  0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb57740) at src/app/srs_app_http_conn.cpp:133
#7  0x00000000004d1d99 in SrsConnection::cycle (this=0xb57740) at src/app/srs_app_conn.cpp:171
#8  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb577e0) at src/app/srs_app_st.cpp:198
#9  0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb577e0) at src/app/srs_app_st.cpp:213
#10 0x00000000005bed1a in _st_thread_main () at sched.c:337
#11 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

对应函数如下,主要功能有:

  1. 根据pattern后缀设置不同的ISrsBufferEncoder,如flv时是SrsFlvStreamEncoder
  2. 创建消费者并从存储音视频数据的队列queue取出messages。
  3. 将messages封装成tag发送到拉流客户端。
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    srs_error_t err = srs_success;
    
    string enc_desc;
    ISrsBufferEncoder* enc = NULL;
    
    srs_assert(entry);
    if (srs_string_ends_with(entry->pattern, ".flv")) { //根据不同的后缀,选择enc
        w->header()->set_content_type("video/x-flv"); // 设置内容type
        enc_desc = "FLV";
        enc = new SrsFlvStreamEncoder();
    } else if (srs_string_ends_with(entry->pattern, ".aac")) {
        w->header()->set_content_type("audio/x-aac");
        enc_desc = "AAC";
        enc = new SrsAacStreamEncoder();
    } else if (srs_string_ends_with(entry->pattern, ".mp3")) {
        w->header()->set_content_type("audio/mpeg");
        enc_desc = "MP3";
        enc = new SrsMp3StreamEncoder();
    } else if (srs_string_ends_with(entry->pattern, ".ts")) {
        w->header()->set_content_type("video/MP2T");
        enc_desc = "TS";
        enc = new SrsTsStreamEncoder();
    } else {
        return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str());
    }
    SrsAutoFree(ISrsBufferEncoder, enc);

    // Enter chunked mode, because we didn't set the content-length. 进入分块模式,因为我们没有设置内容长度。
    w->write_header(SRS_CONSTS_HTTP_OK);
    
    // create consumer of souce, ignore gop cache, use the audio gop cache. 创建源的消费者,忽略gop缓存,使用音频gop缓存。
    SrsConsumer* consumer = NULL;
    if ((err = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != srs_success) {
        return srs_error_wrap(err, "create consumer");
    }
    SrsAutoFree(SrsConsumer, consumer);
    srs_verbose("http: consumer created success.");
    
    SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
    SrsAutoFree(SrsPithyPrint, pprint);
    
    SrsMessageArray msgs(SRS_PERF_MW_MSGS);

    // Use receive thread to accept the close event to avoid FD leak.
    // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
    SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
    SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
    
    // update the statistic when source disconveried.
    SrsStatistic* stat = SrsStatistic::instance();
    if ((err = stat->on_client(srs_int2str(_srs_context->get_id()), req, hc, SrsRtmpConnPlay)) != srs_success) {
        return srs_error_wrap(err, "stat on client");
    }
    
    // the memory writer.
    SrsBufferWriter writer(w);
    if ((err = enc->initialize(&writer, cache)) != srs_success) {
        return srs_error_wrap(err, "init encoder");
    }
    
    // if gop cache enabled for encoder, dump to consumer.
    if (enc->has_cache()) {
        if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) { //SrsFlvStreamEncoder,flv没有开gop cache
            return srs_error_wrap(err, "encoder dump cache");
        }
    }
    
    SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc); //创建SrsFlvStreamEncoder
    
    // Set the socket options for transport.
    bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
    if (tcp_nodelay) {
        if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
            return srs_error_wrap(err, "set tcp nodelay");
        }
    }
    
    srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
    if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) {
        return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
    }

    SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); //http客户端相关信息
    SrsAutoFree(SrsHttpRecvThread, trd);
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "start recv thread");
    }
    
    srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
        entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep), //FLV /live/livestream.flv, encoder=FLV, nodelay=0, mw_sleep=350ms, cache=0, msgs=128
        enc->has_cache(), msgs.max);

    // TODO: free and erase the disabled entry after all related connections is closed.
    // TODO: FXIME: Support timeout for player, quit infinite-loop.
    while (entry->enabled) {
        // Whether client closed the FD.
        if ((err = trd->pull()) != srs_success) { //开启协程接收
            return srs_error_wrap(err, "recv thread");
        }

        pprint->elapse();

        // get messages from consumer.
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        int count = 0;
        if ((err = consumer->dump_packets(&msgs, count)) != srs_success) { //从消费者读取msgs
            return srs_error_wrap(err, "consumer dump packets");
        }
        
        if (count <= 0) {
            // Directly use sleep, donot use consumer wait, because we couldn't awake consumer.
            srs_usleep(mw_sleep);
            // ignore when nothing got.
            continue;
        }
        
        if (pprint->can_print()) {
            srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d",
                count, pprint->age(), SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));
        }
        
        // sendout all messages.
        if (ffe) {
            err = ffe->write_tags(msgs.msgs, count); //如果是flv,此时ffe是SrsFlvStreamEncoder
        } else {
            err = streaming_send_messages(enc, msgs.msgs, count);
        }

        // free the messages.
        for (int i = 0; i < count; i++) {
            SrsSharedPtrMessage* msg = msgs.msgs[i];
            srs_freep(msg);
        }
        
        // check send error code.
        if (err != srs_success) {
            return srs_error_wrap(err, "send messages");
        }
    }

    // Here, the entry is disabled by encoder un-publishing or reloading,
    // so we must return a io.EOF error to disconnect the client, or the client will never quit.
    return srs_error_new(ERROR_HTTP_STREAM_EOF, "Stream EOF");
}

1. 根据pattern后缀设置不同的ISrsBufferEncoder,如flv时是SrsFlvStreamEncoder

    if (srs_string_ends_with(entry->pattern, ".flv")) { //根据不同的后缀,选择enc
        w->header()->set_content_type("video/x-flv"); // 设置内容type
        enc_desc = "FLV";
        enc = new SrsFlvStreamEncoder();
    }

2. 创建消费者并从存储音视频数据的队列queue取出messages

  1. 见SrsSource::create_consumer和SrsConsumer::dump_packets,从SRS服务器拉RTMP流分析过对应函数。

3. 将messages封装成tag发送到拉流客户端

见函数SrsFlvStreamEncoder::write_tags,功能分为两部分:

  1. 发送flv header到拉流客户端
  2. 发送flv body到拉流客户端
srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
{
    srs_error_t err = srs_success;

    // For https://github.com/ossrs/srs/issues/939
    if (!header_written) { //先写flv header
        bool has_video = false;
        bool has_audio = false;

        for (int i = 0; i < count && (!has_video || !has_audio); i++) {
            SrsSharedPtrMessage* msg = msgs[i];
            if (msg->is_video()) {
                has_video = true; //有视频消息
            } else if (msg->is_audio()) {
                has_audio = true; //有音频消息
            }
        }

        // Drop data if no A+V. 如果没有音频和视频数据,返回
        if (!has_video && !has_audio) {
            return err;
        }

        if ((err = write_header(has_video, has_audio))  != srs_success) { //先写header,再写tag
            return srs_error_wrap(err, "write header");
        }
    }

    return enc->write_tags(msgs, count); //写tag
}

发送flv header到拉流客户端

先发送flv header到拉流客户端。

srs_error_t SrsFlvStreamEncoder::write_header(bool has_video, bool has_audio)
{
    srs_error_t err = srs_success;

    if (!header_written) {
        header_written = true;

        if ((err = enc->write_header(has_video, has_audio))  != srs_success) { //写flv header和第一个previousTagSize
            return srs_error_wrap(err, "write header");
        }

        srs_trace("FLV: write header audio=%d, video=%d", has_audio, has_video);
    }

    return err;
}

实际执行函数为SrsFlvTransmuxer::write_header:

srs_error_t SrsFlvTransmuxer::write_header(bool has_video, bool has_audio) // FLV头部
{
    srs_error_t err = srs_success;

    uint8_t av_flag = 0;
    av_flag += (has_audio? 4:0);
    av_flag += (has_video? 1:0);

    // 9bytes header and 4bytes first previous-tag-size
    char flv_header[] = { //构造flv header,9字节,详见flv格式分析
        'F', 'L', 'V', // Signatures "FLV"
        (char)0x01, // File version (for example, 0x01 for FLV version 1)
        (char)av_flag, // 4, audio; 1, video; 5 audio+video.
        (char)0x00, (char)0x00, (char)0x00, (char)0x09 // DataOffset UI32 The length of this header in bytes
    };
    
    // flv specification should set the audio and video flag,
    // actually in practise, application generally ignore this flag,
    // so we generally set the audio/video to 0.
    
    // write 9bytes header.
    if ((err = write_header(flv_header)) != srs_success) { //发送时是写flv header和第一个previousTagSize
        return srs_error_wrap(err, "write header");
    }
    
    return err;
}

发送时是写flv header和第一个previousTagSize:

srs_error_t SrsFlvTransmuxer::write_header(char flv_header[9])
{
    srs_error_t err = srs_success;
    
    // write data.
    if ((err = writer->write(flv_header, 9, NULL)) != srs_success) {
        return srs_error_wrap(err, "write flv header failed");
    }
    
    // previous tag size.
    char pts[] = { (char)0x00, (char)0x00, (char)0x00, (char)0x00 };
    if ((err = writer->write(pts, 4, NULL)) != srs_success) {
        return srs_error_wrap(err, "write pts");
    }
    
    return err;
}

wireshark抓包:

如何从SRS服务器拉HTTP-FLV流

发送flv body到拉流客户端

flv tag data由4字节的previousTagSize和flv tag组成。

其中flv tag又是由11字节的flv tag header和flv tag data组成。

所以可以分为flv body可以看成由三部分组成:previousTagSize、tag header和tag data。

代码SrsFlvTransmuxer::write_tags如下:

srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count)
{
    srs_error_t err = srs_success;
    
    // realloc the iovss.
    int nb_iovss = 3 * count; //连续3个iovss分为代表,tag header,tag data,previousTagSize
    iovec* iovss = iovss_cache;
    if (nb_iovss_cache < nb_iovss) {
        srs_freepa(iovss_cache);
        
        nb_iovss_cache = nb_iovss;
        iovss = iovss_cache = new iovec[nb_iovss];
    }
    
    // realloc the tag headers. 重新分配tag header内存,一个tag header为11字节
    char* cache = tag_headers;
    if (nb_tag_headers < count) {
        srs_freepa(tag_headers);
        
        nb_tag_headers = count;
        cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count];
    }
    
    // realloc the pts. 重新分配previousTagSize内存,一个previousTagSize为4字节
    char* pts = ppts;
    if (nb_ppts < count) {
        srs_freepa(ppts);
        
        nb_ppts = count;
        pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count];
    }
    
    // the cache is ok, write each messages.
    iovec* iovs = iovss;
    for (int i = 0; i < count; i++) {
        SrsSharedPtrMessage* msg = msgs[i];
        
        // cache all flv header.
        if (msg->is_audio()) {  //构造audio tag header
            cache_audio(msg->timestamp, msg->payload, msg->size, cache);
        } else if (msg->is_video()) { //构造video tag header
            cache_video(msg->timestamp, msg->payload, msg->size, cache);
        } else { //构造metadata的tag header
            cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache);
        }
        
        // cache all pts.
        cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts);
        
        // all ioves.
        iovs[0].iov_base = cache;                       // tag header
        iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE;
        iovs[1].iov_base = msg->payload;                // tag body
        iovs[1].iov_len = msg->size;
        iovs[2].iov_base = pts;                         //
        iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE;    // previousTagSize
        
        // move next. 移动到下一个msg
        cache += SRS_FLV_TAG_HEADER_SIZE;
        pts += SRS_FLV_PREVIOUS_TAG_SIZE;
        iovs += 3;
    }
    
    if ((err = writer->writev(iovss, nb_iovss, NULL)) != srs_success) {
        return srs_error_wrap(err, "write flv tags failed");
    }
    
    return err;
}

其中不同的tag header结构不一样,需要独立构造。

构造audio tag header:

void SrsFlvTransmuxer::cache_audio(int64_t timestamp, char* data, int size, char* cache)
{
    srs_assert(data);
    
    timestamp &= 0x7fffffff;
    
    // 11bytes tag header
    /*char tag_header[] = {
     (char)SrsFrameTypeAudio, // TagType UB [5], 8 = audio
     (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
     (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
     (char)0x00, // TimestampExtended UI8
     (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
     };*/
    
    SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
    SrsAutoFree(SrsBuffer, tag_stream);
    
    // write data size.
    tag_stream->write_1bytes(SrsFrameTypeAudio);
    tag_stream->write_3bytes(size);
    tag_stream->write_3bytes((int32_t)timestamp);
    // default to little-endian
    tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
    tag_stream->write_3bytes(0x00);
}

构造video tag header:

void SrsFlvTransmuxer::cache_video(int64_t timestamp, char* data, int size, char* cache)
{
    srs_assert(data);
    
    timestamp &= 0x7fffffff;
    
    // 11bytes tag header
    /*char tag_header[] = {
     (char)SrsFrameTypeVideo, // TagType UB [5], 9 = video
     (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
     (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
     (char)0x00, // TimestampExtended UI8
     (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
     };*/
    
    SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
    SrsAutoFree(SrsBuffer, tag_stream);
    
    // write data size.
    tag_stream->write_1bytes(SrsFrameTypeVideo);
    tag_stream->write_3bytes(size);
    tag_stream->write_3bytes((int32_t)timestamp);
    // default to little-endian
    tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
    tag_stream->write_3bytes(0x00);
}

构造metadata tag header

void SrsFlvTransmuxer::cache_metadata(char type, char* data, int size, char* cache)
{
    srs_assert(data);
    
    // 11 bytes tag header
    /*char tag_header[] = {
     (char)type, // TagType UB [5], 18 = script data
     (char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
     (char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
     (char)0x00, // TimestampExtended UI8
     (char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
     };*/
    
    SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
    SrsAutoFree(SrsBuffer, tag_stream);
    
    // write data size.
    tag_stream->write_1bytes(type);
    tag_stream->write_3bytes(size);
    tag_stream->write_3bytes(0x00);
    tag_stream->write_1bytes(0x00);
    tag_stream->write_3bytes(0x00);
}

至此,完成拉流客户端从SRS服务器拉流过程。

原文:https://www.yuque.com/wahaha-0yfyj/mnfloz/hur2oe

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论