Skip to content

Commit 1eb0c2a

Browse files
committed
Add message IDs to header.
These are the message IDs for all topics and supertopics. Also rename all "frame" to "message" to avoid inconsistent naming.
1 parent b20e930 commit 1eb0c2a

4 files changed

Lines changed: 36 additions & 38 deletions

File tree

catkit_core/LocalMessageBroker.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ Message LocalMessageBroker::PrepareMessageImpl(std::string_view topic, size_t pa
511511
header->producer_pid = GetProcessId();
512512
header->producer_timestamp = 0;
513513

514-
header->partial_frame_id = 0;
514+
header->partial_message_id = 0;
515515
header->start_byte = 0;
516516
header->end_byte = payload_size;
517517

@@ -548,15 +548,15 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final)
548548
std::uint64_t first_id = topic_header->first_frame_id.load(std::memory_order_relaxed);
549549

550550
std::uint64_t frame_id;
551-
if (message.m_Header->partial_frame_id == 0)
551+
if (message.m_Header->partial_message_id == 0)
552552
{
553553
// Get a frame ID.
554554
frame_id = topic_header->ReserveNextMessageId();
555555
}
556556
else
557557
{
558558
frame_id = message.GetFrameId();
559-
message.m_Header->partial_frame_id++;
559+
message.m_Header->partial_message_id++;
560560
}
561561

562562
// Check if we need to remove an old frame from the topic.
@@ -809,9 +809,7 @@ TopicHeader *LocalMessageBroker::GetTopicHeader(std::string_view topic)
809809
// The topic header doesn't exist, so create it.
810810
TopicHeader temp_topic_header;
811811

812-
temp_topic_header.next_frame_id = 0;
813-
temp_topic_header.first_frame_id = 0;
814-
temp_topic_header.last_frame_id = 0;
812+
temp_topic_header.availability = 0;
815813
temp_topic_header.frame_rate = 0.0;
816814

817815
topic_header = (TopicHeader *) m_TopicHeaders->Insert(topic, &temp_topic_header);

catkit_core/LocalMessageBroker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#define LOCAL_MESSAGE_BROKER_H
33

44
#include "HashMap.h"
5-
#include "Event.h".
5+
#include "Event.h"
66
#include "HybridPoolAllocator.h"
77
#include "PoolAllocator.h"
88
#include "SharedMemory.h"

catkit_core/MessageBroker.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
#include "LocalMessageBroker.h"
66

7-
Message::Message(MessageHeader *header, void *payload, std::uint64_t frame_id)
8-
: m_Header(header), m_Payload(payload), m_FrameId(frame_id)
7+
Message::Message(MessageHeader *header, void *payload)
8+
: m_Header(header), m_Payload(payload)
99
{
1010
}
1111

@@ -19,14 +19,14 @@ const Uuid &Message::GetPayloadId() const
1919
return m_Header->payload_id;
2020
}
2121

22-
std::uint64_t Message::GetFrameId() const
22+
std::uint64_t Message::GetMessageId() const
2323
{
24-
return m_FrameId;
24+
return m_Header->message_ids[0];
2525
}
2626

27-
std::uint16_t Message::GetPartialFrameId() const
27+
std::uint16_t Message::GetPartialMessageId() const
2828
{
29-
return m_Header->partial_frame_id;
29+
return m_Header->partial_message_id;
3030
}
3131

3232
const Uuid &Message::GetTraceId() const
@@ -143,33 +143,33 @@ void Message::SetEndByte(std::uint64_t end_byte)
143143
m_Header->end_byte = end_byte;
144144
}
145145

146-
MessageSubscription::MessageSubscription(std::shared_ptr<MessageBroker> message_broker, std::string_view topic, std::uint64_t preferred_next_frame_id, MessageSubscriptionMode mode)
147-
: m_MessageBroker(message_broker), m_Topic(topic), m_PreferredNextFrameId(preferred_next_frame_id), m_SubscriptionMode(mode)
146+
MessageSubscription::MessageSubscription(std::shared_ptr<MessageBroker> message_broker, std::string_view topic, std::uint64_t preferred_next_message_id, MessageSubscriptionMode mode)
147+
: m_MessageBroker(message_broker), m_Topic(topic), m_PreferredNextMessageId(preferred_next_message_id), m_SubscriptionMode(mode)
148148
{
149149
}
150150

151151
std::optional<Message> MessageSubscription::GetNextMessage(double timeout_in_seconds, EventWaitMethod wait_type, void (*error_check)())
152152
{
153-
auto message = m_MessageBroker->GetNextMessage(m_Topic, m_PreferredNextFrameId, m_SubscriptionMode, timeout_in_seconds, wait_type, error_check);
153+
auto message = m_MessageBroker->GetNextMessage(m_Topic, m_PreferredNextMessageId, m_SubscriptionMode, timeout_in_seconds, wait_type, error_check);
154154

155155
if (!message.has_value())
156156
return message;
157157

158-
// We are going to return a message. Update our frame id for the next call.
159-
m_PreferredNextFrameId = message->GetFrameId() + 1;
158+
// We are going to return a message. Update our message id for the next call.
159+
m_PreferredNextMessageId = message->GetMessageId() + 1;
160160

161161
return message;
162162
}
163163

164164
std::optional<Message> MessageSubscription::TryGetNextMessage()
165165
{
166-
auto message = m_MessageBroker->TryGetNextMessage(m_Topic, m_PreferredNextFrameId, m_SubscriptionMode);
166+
auto message = m_MessageBroker->TryGetNextMessage(m_Topic, m_PreferredNextMessageId, m_SubscriptionMode);
167167

168168
if (!message.has_value())
169169
return message;
170170

171-
// We are going to return a message. Update our frame id for the next call.
172-
m_PreferredNextFrameId = message->GetFrameId() + 1;
171+
// We are going to return a message. Update our message id for the next call.
172+
m_PreferredNextMessageId = message->GetMessageId() + 1;
173173

174174
return message;
175175
}
@@ -181,7 +181,7 @@ std::optional<size_t> MessageBroker::GetCurrentMessageId(std::string_view topic)
181181
if (!message.has_value())
182182
return std::nullopt;
183183

184-
return message.value().GetFrameId();
184+
return message.value().GetMessageId();
185185
}
186186

187187
Message MessageBroker::PrepareMessage(std::string_view topic, size_t payload_size, uint8_t memory_block_id)
@@ -251,14 +251,14 @@ Message MessageBroker::PublishArray(std::string_view topic, ArrayView array, Uui
251251
MessageSubscription MessageBroker::Subscribe(std::string_view topic, MessageSubscriptionMode mode)
252252
{
253253
auto current_message = GetCurrentMessage(topic);
254-
auto starting_frame_id = current_message.has_value() ? current_message->GetFrameId() : 0;
254+
auto starting_message_id = current_message.has_value() ? current_message->GetMessageId() : 0;
255255

256-
return Subscribe(topic, starting_frame_id, mode);
256+
return Subscribe(topic, starting_message_id, mode);
257257
}
258258

259-
MessageSubscription MessageBroker::Subscribe(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode)
259+
MessageSubscription MessageBroker::Subscribe(std::string_view topic, size_t preferred_next_message_id, MessageSubscriptionMode mode)
260260
{
261-
return MessageSubscription(shared_from_this(), topic, preferred_next_frame_id, mode);
261+
return MessageSubscription(shared_from_this(), topic, preferred_next_message_id, mode);
262262
}
263263

264264
void MessageBroker::PrintDebugInfo() const

catkit_core/MessageBroker.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
#include <optional>
1616

1717
const size_t TOPIC_MAX_KEY_SIZE = 127;
18+
const size_t TOPIC_MAX_DEPTH = 7;
1819
const size_t HOST_NAME_SIZE = 64;
1920
const size_t METADATA_MAX_STRLEN = 8;
2021
const size_t METADATA_MAX_KEYLEN = 7;
2122
const size_t MAX_NUM_METADATA_ENTRIES = 12;
2223

23-
const std::uint64_t INVALID_FRAME_ID = 0xFFFFFFFFFFFFFFFF;
24+
const std::uint64_t INVALID_MESSAGE_ID = 0xFFFFFFFFFFFFFFFF;
2425

2526
enum class MetadataType : std::uint8_t
2627
{
@@ -68,7 +69,8 @@ struct MessageHeader
6869
std::uint64_t start_byte;
6970
std::uint64_t end_byte;
7071

71-
std::uint16_t partial_frame_id;
72+
std::uint64_t message_ids[TOPIC_MAX_DEPTH];
73+
std::uint16_t partial_message_id;
7274

7375
std::uint8_t num_metadata_entries;
7476
MetadataEntry metadata_entries[MAX_NUM_METADATA_ENTRIES];
@@ -85,14 +87,14 @@ class Message
8587
friend class RemoteBrokerServer;
8688

8789
private:
88-
Message(MessageHeader *header, void *payload, std::uint64_t frame_id);
90+
Message(MessageHeader *header, void *payload);
8991

9092
public:
9193
std::string_view GetTopic() const;
9294

9395
const Uuid &GetPayloadId() const;
94-
std::uint64_t GetFrameId() const;
95-
std::uint16_t GetPartialFrameId() const;
96+
std::uint64_t GetMessageId() const;
97+
std::uint16_t GetPartialMessageId() const;
9698

9799
const Uuid &GetTraceId() const;
98100

@@ -122,8 +124,6 @@ class Message
122124
private:
123125
MessageHeader *m_Header;
124126
void *m_Payload;
125-
126-
std::uint64_t m_FrameId;
127127
};
128128

129129
enum class MessageSubscriptionMode
@@ -143,12 +143,12 @@ class MessageSubscription
143143
std::optional<Message> TryGetNextMessage();
144144

145145
private:
146-
MessageSubscription(std::shared_ptr<MessageBroker> broker, std::string_view topic, std::uint64_t preferred_next_frame_id, MessageSubscriptionMode mode);
146+
MessageSubscription(std::shared_ptr<MessageBroker> broker, std::string_view topic, std::uint64_t preferred_next_message_id, MessageSubscriptionMode mode);
147147

148148
std::shared_ptr<MessageBroker> m_MessageBroker;
149149

150150
std::string m_Topic;
151-
std::uint64_t m_PreferredNextFrameId;
151+
std::uint64_t m_PreferredNextMessageId;
152152
MessageSubscriptionMode m_SubscriptionMode;
153153
};
154154

@@ -178,13 +178,13 @@ class MessageBroker : public std::enable_shared_from_this<MessageBroker>
178178
Message PublishArray(std::string_view topic, ArrayView array, Uuid trace_id, uint8_t memory_block_id = 0);
179179

180180
MessageSubscription Subscribe(std::string_view topic, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
181-
MessageSubscription Subscribe(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
181+
MessageSubscription Subscribe(std::string_view topic, size_t preferred_next_message_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
182182

183183
virtual void PrintDebugInfo() const;
184184

185185
protected:
186-
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) = 0;
187-
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) = 0;
186+
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t preferred_next_message_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) = 0;
187+
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t preferred_next_message_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) = 0;
188188
};
189189

190190
#endif // MESSAGE_BROKER_H

0 commit comments

Comments
 (0)