Skip to content

Commit 1e7c121

Browse files
authored
fix: support negative RTMP extended timestamp deltas (#2093)
1 parent f54df32 commit 1e7c121

14 files changed

Lines changed: 1770 additions & 606 deletions

src/projects/modules/rtmp/chunk/rtmp_chunk_parser.cpp

Lines changed: 241 additions & 199 deletions
Large diffs are not rendered by default.

src/projects/modules/rtmp/chunk/rtmp_chunk_parser.h

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,32 @@
88
//==============================================================================
99
#pragma once
1010

11-
#include <base/info/info.h>
12-
13-
#include <deque>
14-
#include <map>
15-
#include <memory>
16-
11+
#include "../rtmp_chunk_parser_common.h"
1712
#include "rtmp_datastructure.h"
1813
#include "rtmp_define.h"
1914
#include "rtmp_mux_util.h"
2015

21-
class RtmpChunkParser
16+
class RtmpChunkParser : public RtmpChunkParserCommon<RtmpChunkHeader, RtmpMessage>
2217
{
2318
public:
24-
enum class ParseResult
25-
{
26-
Error,
27-
NeedMoreData,
28-
Parsed,
29-
};
30-
31-
enum class ParseResultForExtendedTimestamp
32-
{
33-
NeedMoreData,
34-
Extended,
35-
NotExtended,
36-
};
19+
using ParseResult = typename RtmpChunkParserCommon<RtmpChunkHeader, RtmpMessage>::ParseResult;
3720

38-
public:
3921
RtmpChunkParser(size_t chunk_size);
4022
virtual ~RtmpChunkParser();
4123

24+
/// Parses as much of a single RTMP chunk as possible from the supplied data.
25+
///
26+
/// @param data Input bytes beginning at the next unread RTMP chunk boundary.
27+
/// @param bytes_used Receives the number of bytes consumed from @p data.
28+
///
29+
/// @return `Parsed` when a chunk payload step completed, `NeedMoreData` when
30+
/// more bytes are required, or `Error` when the chunk stream is
31+
/// malformed.
32+
///
33+
/// @note When this returns `NeedMoreData`, @p bytes_used is set to `0` and
34+
/// the caller must replay the same unconsumed prefix again with more
35+
/// bytes appended. Header parsing is not resumable from the middle of
36+
/// a partially received header.
4237
ParseResult Parse(const std::shared_ptr<const ov::Data> &data, size_t *bytes_used);
4338

4439
std::shared_ptr<const RtmpMessage> GetMessage();
@@ -49,47 +44,46 @@ class RtmpChunkParser
4944
_chunk_size = chunk_size;
5045
}
5146

52-
info::NamePath GetNamePath() const;
53-
void UpdateNamePath(const info::NamePath &stream_name_path);
54-
5547
void Destroy();
5648

5749
private:
50+
/// Returns the most recent completed header for a chunk stream.
51+
///
52+
/// @param chunk_stream_id RTMP chunk stream identifier.
53+
///
54+
/// @return The preceding parsed header for @p chunk_stream_id, or `nullptr`
55+
/// if no header has been completed on that chunk stream yet.
5856
std::shared_ptr<const RtmpChunkHeader> GetPrecedingChunkHeader(const uint32_t chunk_stream_id);
5957

58+
/// Checks whether the next header belongs to an unfinished message.
59+
///
60+
/// @param chunk_stream_id RTMP chunk stream identifier for the incoming chunk.
61+
///
62+
/// @return `true` when the parser is expecting a continuation chunk for the
63+
/// same chunk stream, including interleaved pending-message cases.
64+
bool IsContinuationChunk(const uint32_t chunk_stream_id) const;
65+
66+
/// Parses the RTMP basic header.
67+
///
68+
/// @param stream Input byte stream positioned at the RTMP basic header.
69+
/// @param chunk_header Destination header object to populate.
70+
///
71+
/// @return `Parsed`, `NeedMoreData`, or `Error`.
6072
ParseResult ParseBasicHeader(ov::ByteStream &stream, RtmpChunkHeader *chunk_header);
61-
ParseResultForExtendedTimestamp ParseExtendedTimestamp(
62-
const uint32_t stream_id,
63-
ov::ByteStream &stream,
64-
RtmpChunkHeader *chunk_header,
65-
const int64_t timestamp,
66-
RtmpChunkHeader::CompletedHeader *completed_header);
67-
ParseResultForExtendedTimestamp ParseExtendedTimestampDelta(
68-
const uint32_t stream_id,
69-
ov::ByteStream &stream,
70-
RtmpChunkHeader *chunk_header,
71-
const int64_t preceding_timestamp,
72-
const int64_t timestamp_delta,
73-
RtmpChunkHeader::CompletedHeader *completed_header);
74-
ParseResult ParseMessageHeader(ov::ByteStream &stream, RtmpChunkHeader *chunk_header);
75-
ParseResult ParseHeader(ov::ByteStream &stream, RtmpChunkHeader *chunk_header);
76-
77-
int64_t CalculateRolledTimestamp(const uint32_t stream_id, const int64_t last_timestamp, int64_t parsed_timestamp);
78-
79-
private:
80-
#if DEBUG
81-
uint64_t _chunk_index = 0ULL;
82-
uint64_t _total_read_bytes = 0ULL;
83-
#endif // DEBUG
8473

85-
bool _need_to_parse_new_header = true;
86-
std::shared_ptr<RtmpMessage> _current_message;
87-
std::map<uint32_t, std::shared_ptr<RtmpMessage>> _pending_message_map;
88-
std::map<uint32_t, std::shared_ptr<const RtmpChunkHeader>> _preceding_chunk_header_map;
89-
90-
ov::Queue<std::shared_ptr<const RtmpMessage>> _message_queue{nullptr, 500};
91-
size_t _chunk_size;
74+
/// Parses the RTMP message header for an already-parsed basic header.
75+
///
76+
/// @param stream Input byte stream positioned at the RTMP message header.
77+
/// @param chunk_header Destination header object to populate.
78+
///
79+
/// @return `Parsed`, `NeedMoreData`, or `Error`.
80+
ParseResult ParseMessageHeader(ov::ByteStream &stream, RtmpChunkHeader *chunk_header);
9281

93-
mutable std::mutex _name_path_mutex;
94-
info::NamePath _name_path;
82+
/// Parses both the RTMP basic header and message header.
83+
///
84+
/// @param stream Input byte stream positioned at the start of an RTMP header.
85+
/// @param chunk_header Destination header object to populate.
86+
///
87+
/// @return `Parsed`, `NeedMoreData`, or `Error`.
88+
ParseResult ParseHeader(ov::ByteStream &stream, RtmpChunkHeader *chunk_header);
9589
};

src/projects/modules/rtmp/chunk/rtmp_datastructure.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ enum class RtmpMessageHeaderType : uint8_t
2323
#pragma pack(push, 1)
2424
struct RtmpChunkHeader
2525
{
26-
static constexpr const size_t EXTENDED_TIMESTAMP_SIZE = 4;
27-
2826
RtmpChunkHeader()
2927
{
3028
::memset(&message_header, 0, sizeof(message_header));
@@ -39,7 +37,7 @@ struct RtmpChunkHeader
3937

4038
// Indicates whether the timestamp is extended
4139
bool is_extended_timestamp = false;
42-
bool is_timestamp_delta = false;
40+
bool is_timestamp_delta = false;
4341

4442
// Total chunk header size (basic header + message header + extended timestamp)
4543
uint32_t GetTotalHeaderLength() const
@@ -48,16 +46,16 @@ struct RtmpChunkHeader
4846
}
4947

5048
#if DEBUG
51-
uint64_t chunk_index = 0ULL;
49+
uint64_t chunk_index = 0ULL;
5250

53-
uint64_t from_byte_offset = 0ULL;
51+
uint64_t from_byte_offset = 0ULL;
5452
mutable uint64_t message_total_bytes = 0ULL;
5553
#endif // DEBUG
5654

5755
// 1 or 2 or 3
58-
uint8_t basic_header_length = 0U;
56+
uint8_t basic_header_length = 0U;
5957
uint32_t message_header_length = 0U;
60-
uint32_t message_length = 0U;
58+
uint32_t message_length = 0U;
6159

6260
// uint32_t expected_type_3_header{};
6361
// The payload size that including type 3 messages
@@ -75,16 +73,16 @@ struct RtmpChunkHeader
7573

7674
// Extended Timestamp/Timestamp delta
7775
uint32_t extended_timestamp = 0U;
78-
static_assert(sizeof(RtmpChunkHeader::extended_timestamp) == EXTENDED_TIMESTAMP_SIZE, "Extended timestamp size must be 4 bytes");
76+
static_assert(sizeof(RtmpChunkHeader::extended_timestamp) == RTMP_EXTEND_TIMESTAMP_SIZE, "Extended timestamp size must be 4 bytes");
7977

8078
struct CompletedHeader
8179
{
8280
// Accumulated timestamp
83-
int64_t timestamp = 0LL;
84-
uint32_t timestamp_delta = 0U;
81+
int64_t timestamp = 0LL;
82+
uint32_t timestamp_delta = 0U;
8583

8684
RtmpMessageTypeID type_id = RtmpMessageTypeID::Unknown;
87-
uint32_t stream_id = 0U;
85+
uint32_t stream_id = 0U;
8886
} completed;
8987

9088
// Message Header
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//==============================================================================
2+
//
3+
// OvenMediaEngine
4+
//
5+
// Created by Hyunjun Jang
6+
// Copyright (c) 2026 AirenSoft. All rights reserved.
7+
//
8+
//==============================================================================
9+
#pragma once
10+
11+
#include <base/info/info.h>
12+
#include <base/ovlibrary/queue.h>
13+
14+
template <typename Theader, typename Tmessage>
15+
class RtmpChunkParserCommon
16+
{
17+
public:
18+
enum class ParseResult
19+
{
20+
Error,
21+
NeedMoreData,
22+
Parsed,
23+
};
24+
25+
public:
26+
info::NamePath GetNamePath() const
27+
{
28+
std::lock_guard lock_guard(_name_path_mutex);
29+
return _name_path;
30+
}
31+
32+
virtual void UpdateNamePath(const info::NamePath &stream_name_path)
33+
{
34+
std::lock_guard lock_guard(_name_path_mutex);
35+
_name_path = stream_name_path;
36+
_message_queue.SetAlias(ov::String::FormatString("RTMP queue for %s", _name_path.CStr()));
37+
}
38+
39+
protected:
40+
bool _need_to_parse_new_header = true;
41+
std::shared_ptr<Tmessage> _current_message;
42+
std::map<uint32_t, std::shared_ptr<Tmessage>> _pending_message_map;
43+
44+
#if DEBUG
45+
uint64_t _chunk_index = 0ULL;
46+
uint64_t _total_read_bytes = 0ULL;
47+
#endif // DEBUG
48+
49+
std::map<uint32_t, std::shared_ptr<const Theader>> _preceding_chunk_header_map;
50+
ov::Queue<std::shared_ptr<const Tmessage>> _message_queue{nullptr, 500};
51+
size_t _chunk_size = 0;
52+
53+
private:
54+
mutable std::mutex _name_path_mutex;
55+
info::NamePath _name_path;
56+
};

0 commit comments

Comments
 (0)