@@ -53,6 +53,13 @@ static ngx_command_t ngx_rtmp_live_commands[] = {
5353 offsetof(ngx_rtmp_live_app_conf_t , buflen ),
5454 NULL },
5555
56+ { ngx_string ("sync" ),
57+ NGX_RTMP_MAIN_CONF |NGX_RTMP_SRV_CONF |NGX_RTMP_APP_CONF |NGX_CONF_TAKE1 ,
58+ ngx_conf_set_msec_slot ,
59+ NGX_RTMP_APP_CONF_OFFSET ,
60+ offsetof(ngx_rtmp_live_app_conf_t , sync ),
61+ NULL },
62+
5663 ngx_null_command
5764};
5865
@@ -99,6 +106,7 @@ ngx_rtmp_live_create_app_conf(ngx_conf_t *cf)
99106 lacf -> meta = NGX_CONF_UNSET ;
100107 lacf -> nbuckets = NGX_CONF_UNSET ;
101108 lacf -> buflen = NGX_CONF_UNSET ;
109+ lacf -> sync = NGX_CONF_UNSET ;
102110
103111 return lacf ;
104112}
@@ -114,6 +122,7 @@ ngx_rtmp_live_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
114122 ngx_conf_merge_value (conf -> meta , prev -> meta , 1 );
115123 ngx_conf_merge_value (conf -> nbuckets , prev -> nbuckets , 1024 );
116124 ngx_conf_merge_msec_value (conf -> buflen , prev -> buflen , 0 );
125+ ngx_conf_merge_msec_value (conf -> sync , prev -> sync , 0 );
117126
118127 conf -> pool = ngx_create_pool (4096 , & cf -> cycle -> new_log );
119128 if (conf -> pool == NULL ) {
@@ -292,16 +301,18 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
292301{
293302 ngx_rtmp_live_ctx_t * ctx , * pctx ;
294303 ngx_rtmp_codec_ctx_t * codec_ctx ;
295- ngx_chain_t * out , * peer_out , * header_out ,
304+ ngx_chain_t * out , * peer_out , * header_out ,
296305 * pheader_out , * meta ;
297306 ngx_rtmp_core_srv_conf_t * cscf ;
298307 ngx_rtmp_live_app_conf_t * lacf ;
299308 ngx_rtmp_session_t * ss ;
300309 ngx_rtmp_header_t ch , lh ;
301310 ngx_uint_t prio , peer_prio ;
302311 ngx_uint_t peers , dropped_peers ;
303- size_t header_offset ;
312+ size_t header_offset , last_offset ;
304313 ngx_uint_t header_version , meta_version ;
314+ ngx_int_t diff_timestamp ;
315+ uint32_t * last ;
305316
306317 lacf = ngx_rtmp_get_module_app_conf (s , ngx_rtmp_live_module );
307318 if (lacf == NULL ) {
@@ -345,14 +356,16 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
345356 ch .csid = NGX_RTMP_LIVE_CSID_VIDEO ;
346357 lh .timestamp = ctx -> last_video ;
347358 ctx -> last_video = ch .timestamp ;
359+ last_offset = offsetof(ngx_rtmp_live_ctx_t , last_video );
348360 } else {
349- /* audio priority is the same as video key frame's */
350- prio = NGX_RTMP_VIDEO_KEY_FRAME ;
361+ prio = 0 ;
351362 ch .csid = NGX_RTMP_LIVE_CSID_AUDIO ;
352363 lh .timestamp = ctx -> last_audio ;
353364 ctx -> last_audio = ch .timestamp ;
365+ last_offset = offsetof(ngx_rtmp_live_ctx_t , last_audio );
354366 }
355367 lh .csid = ch .csid ;
368+ diff_timestamp = ch .timestamp - lh .timestamp ;
356369
357370 out = ngx_rtmp_append_shared_bufs (cscf , NULL , in );
358371 ngx_rtmp_prepare_message (s , & ch , & lh , out );
@@ -396,25 +409,39 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
396409 }
397410 ++ peers ;
398411 ss = pctx -> session ;
412+ last = (uint32_t * )((u_char * )pctx + last_offset );
413+ ch .timestamp = s -> epoch + h -> timestamp - ss -> epoch ;
399414
400415 /* send absolute frame */
401416 if ((pctx -> msg_mask & (1 << h -> type )) == 0 ) {
402- ch .timestamp = ngx_current_msec - ss -> epoch ;
417+
418+ /* packet from the past for the peer */
419+ if (s -> epoch + h -> timestamp < ss -> epoch ) {
420+ ngx_log_debug3 (NGX_LOG_DEBUG_RTMP , ss -> connection -> log , 0 ,
421+ "live: av: %s packet from the past %uD < %uD" ,
422+ h -> type == NGX_RTMP_MSG_VIDEO ? "video" : "audio" ,
423+ (uint32_t )(s -> epoch + h -> timestamp ), (uint32_t )ss -> epoch );
424+ continue ;
425+ }
426+
403427 ngx_log_debug2 (NGX_LOG_DEBUG_RTMP , ss -> connection -> log , 0 ,
404428 "live: av: abs %s timestamp=%uD" ,
405429 h -> type == NGX_RTMP_MSG_VIDEO ? "video" : "audio" ,
406430 ch .timestamp );
431+
407432 /* send codec header as abs frame if any */
408433 peer_out = ngx_rtmp_append_shared_bufs (cscf , NULL ,
409434 header_out ? header_out : in );
410435 ngx_rtmp_prepare_message (s , & ch , NULL , peer_out );
411- pctx -> msg_mask |= (1 << h -> type );
412- if (ngx_rtmp_send_message (ss , peer_out , prio ) == NGX_OK
413- && header_out )
414- {
415- * (ngx_uint_t * )((u_char * )pctx + header_offset )
416- = header_version ;
436+ if (ngx_rtmp_send_message (ss , peer_out , prio ) == NGX_OK ) {
437+ pctx -> msg_mask |= (1 << h -> type );
438+ if (header_out ) {
439+ * (ngx_uint_t * )((u_char * )pctx + header_offset )
440+ = header_version ;
441+ * last = ch .timestamp ;
442+ }
417443 }
444+
418445 ngx_rtmp_free_shared_chain (cscf , peer_out );
419446 continue ;
420447 }
@@ -445,7 +472,27 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
445472 if (ngx_rtmp_send_message (ss , out , peer_prio ) != NGX_OK ) {
446473 ++ pctx -> dropped ;
447474 ++ dropped_peers ;
475+ continue ;
476+ }
477+
478+ * last += diff_timestamp ;
479+
480+ if (lacf -> sync == 0 || * last + lacf -> sync >= ch .timestamp ) {
481+ continue ;
482+ }
483+
484+ /* send absolute frame to sync stream */
485+ ngx_log_debug2 (NGX_LOG_DEBUG_RTMP , ss -> connection -> log , 0 ,
486+ "live: av: sync %s: %i" ,
487+ h -> type == NGX_RTMP_MSG_VIDEO ? "video" : "audio" ,
488+ (ngx_int_t ) (ch .timestamp - * last ));
489+
490+ peer_out = ngx_rtmp_alloc_shared_buf (cscf );
491+ ngx_rtmp_prepare_message (s , & ch , NULL , peer_out );
492+ if (ngx_rtmp_send_message (ss , peer_out , 0 ) == NGX_OK ) {
493+ * last = ch .timestamp ;
448494 }
495+ ngx_rtmp_free_shared_chain (cscf , peer_out );
449496 }
450497 ngx_rtmp_free_shared_chain (cscf , out );
451498
0 commit comments