Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 1 addition & 55 deletions catkit_core/HashMap.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "HashMap.h"
#include "Util.h"

#include <algorithm>
#include <cstring>
Expand All @@ -7,61 +8,6 @@
//#define DEBUG_PRINT(a) std::cout << a << std::endl
#define DEBUG_PRINT(a)

// MurmurHash3 32-bit version
uint32_t murmurhash3(std::string_view key, uint32_t seed = 0)
{
const uint8_t *data = reinterpret_cast<const uint8_t *>(key.data());
size_t len = key.size();

uint32_t h = seed;
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;

// Partition in blocks of 4 bytes.
const size_t nblocks = len / 4;
const uint32_t* blocks = reinterpret_cast<const uint32_t *>(data);
for (size_t i = 0; i < nblocks; i++)
{
uint32_t k = blocks[i];
k *= c1;
k = (k << 15) | (k >> 17);
k *= c2;

h ^= k;
h = (h << 13) | (h >> 19);
h = h * 5 + 0xe6546b64;
}

// Process leftover bytes.
const uint8_t* tail = data + nblocks * 4;
uint32_t k1 = 0;

switch (len & 3)
{
case 3:
k1 ^= tail[2] << 16;
case 2:
k1 ^= tail[1] << 8;
case 1:
k1 ^= tail[0];
k1 *= c1;
k1 = (k1 << 15) | (k1 >> 17);
k1 *= c2;
h ^= k1;
case 0:
; // Do nothing.
}

h ^= len;
h ^= (h >> 16);
h *= 0x85ebca6b;
h ^= (h >> 13);
h *= 0xc2b2ae35;
h ^= (h >> 16);

return h;
}

std::string to_hex(unsigned char c)
{
const static char hex[] = "0123456789abcdef";
Expand Down
45 changes: 35 additions & 10 deletions catkit_core/LocalMessageBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ LocalMessageBroker::LocalMessageBroker(
MessageBrokerHeader *header,
std::shared_ptr<HashMap> topic_headers,
std::shared_ptr<PoolAllocator> message_header_allocator,
std::shared_ptr<Event> event,
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool,
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators,
std::vector<std::shared_ptr<Memory>> memory_blocks,
std::shared_ptr<Memory> header_memory
Expand All @@ -154,7 +154,7 @@ LocalMessageBroker::LocalMessageBroker(
m_Header(header),
m_TopicHeaders(std::move(topic_headers)),
m_MessageHeaderAllocator(std::move(message_header_allocator)),
m_Event(std::move(event)),
m_EventPool(std::move(event_pool)),
m_Allocators(std::move(allocators)),
m_MemoryBlocks(memory_blocks),
m_MessageHeaders(header->message_headers)
Expand Down Expand Up @@ -184,8 +184,14 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Create(StructStream &str

auto message_header_allocator = PoolAllocator::Create(stream, MAX_NUM_MESSAGES);

std::string id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation);
auto event = Event::Create(stream, id);
// Create event pool for topic-based notification
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool;
std::string base_id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation);
for (size_t i = 0; i < EVENT_POOL_SIZE; ++i)
{
std::string event_id = base_id + "_" + std::to_string(i);
event_pool[i] = Event::Create(stream, event_id);
}

DEBUG_PRINT("Extracting allocators.");

Expand All @@ -210,7 +216,7 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Create(StructStream &str
header,
std::move(topic_headers),
std::move(message_header_allocator),
std::move(event),
std::move(event_pool),
allocators,
memory_blocks,
stream.GetBuffer()
Expand All @@ -225,7 +231,13 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Open(StructStream &strea

auto topic_headers = HashMap::Open(stream);
auto message_header_allocator = PoolAllocator::Open(stream);
auto event = Event::Open(stream);

// Open event pool
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool;
for (size_t i = 0; i < EVENT_POOL_SIZE; ++i)
{
event_pool[i] = Event::Open(stream);
}

std::vector<std::shared_ptr<HybridPoolAllocator>> allocators;
std::vector<std::shared_ptr<Memory>> memory_blocks;
Expand Down Expand Up @@ -253,7 +265,7 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Open(StructStream &strea
header,
std::move(topic_headers),
std::move(message_header_allocator),
std::move(event),
std::move(event_pool),
allocators,
memory_blocks,
stream.GetBuffer()
Expand Down Expand Up @@ -362,13 +374,20 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final)
auto topic = std::string_view(message.m_Header->topic);
auto allocator = GetAllocator(message.m_Header->payload_info.memory_block_id);

// Track which event indices need to be signaled (to avoid duplicates)
std::array<bool, EVENT_POOL_SIZE> signaled_events{};
signaled_events.fill(false);

// Publish the message to all subtopics.
for (const auto &subtopic : SubtopicRange(topic))
{
auto topic_header = GetTopicHeader(subtopic);

DEBUG_PRINT("Publishing to subtopic \"" << subtopic << "\".");

// Track this subtopic's event for signaling
signaled_events[topic_header->event_index] = true;

std::uint64_t first_id = topic_header->first_frame_id.load(std::memory_order_relaxed);

std::uint64_t frame_id;
Expand Down Expand Up @@ -448,7 +467,12 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final)
break;
}

m_Event->Signal();
// Signal all unique events for the subtopics
for (std::size_t i = 0; i < EVENT_POOL_SIZE; ++i)
{
if (signaled_events[i])
m_EventPool[i]->Signal();
}

// Deallocate the message header and payload.
PoolAllocator::BlockHandle message_header_index = message.m_Header - m_MessageHeaders;
Expand Down Expand Up @@ -556,8 +580,8 @@ std::optional<Message> LocalMessageBroker::GetNextMessage(std::string_view topic
if (topic_header->IsMessageAvailable(new_frame_id))
return FetchMessage(topic_header, new_frame_id);

// Otherwise, wait for it.
m_Event->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check);
// Otherwise, wait for it on the specific event for this topic.
m_EventPool[topic_header->event_index]->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check);

return FetchMessage(topic_header, new_frame_id);
}
Expand Down Expand Up @@ -670,6 +694,7 @@ TopicHeader *LocalMessageBroker::GetTopicHeader(std::string_view topic)
temp_topic_header.first_frame_id = 0;
temp_topic_header.last_frame_id = 0;
temp_topic_header.frame_rate = 0.0;
temp_topic_header.event_index = murmurhash3(topic) % EVENT_POOL_SIZE;

topic_header = (TopicHeader *) m_TopicHeaders->Insert(topic, &temp_topic_header);

Expand Down
8 changes: 7 additions & 1 deletion catkit_core/LocalMessageBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const size_t MAX_NUM_MESSAGES = 65536;
const size_t MAX_NUM_BLOCKS = 8192;
const size_t MEMORY_ALIGNMENT = 32;
const size_t MIN_SIZE_POOL = 1024;
const size_t EVENT_POOL_SIZE = 128;

struct TopicHeader
{
Expand All @@ -35,6 +36,9 @@ struct TopicHeader

std::array<std::uint64_t, TOPIC_MAX_NUM_MESSAGES> message_headers;

// Pre-computed event index for this topic (hash of topic name % EVENT_POOL_SIZE)
std::uint32_t event_index;

bool IsMessageAvailable(std::size_t frame_id);
bool WillMessageBeAvailable(std::size_t frame_id);
std::size_t GetOldestMessageId();
Expand Down Expand Up @@ -65,7 +69,7 @@ class LocalMessageBroker : public Shareable, public MessageBroker
MessageBrokerHeader *header,
std::shared_ptr<HashMap> topic_headers,
std::shared_ptr<PoolAllocator> message_header_allocator,
std::shared_ptr<Event> event,
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool,
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators,
std::vector<std::shared_ptr<Memory>> memory_blocks,
std::shared_ptr<Memory> header_memory
Expand Down Expand Up @@ -135,6 +139,8 @@ class LocalMessageBroker : public Shareable, public MessageBroker
std::shared_ptr<PoolAllocator> m_MessageHeaderAllocator;
MessageHeader *m_MessageHeaders;

std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> m_EventPool;

std::vector<std::shared_ptr<HybridPoolAllocator>> m_Allocators;
std::vector<std::shared_ptr<Memory>> m_MemoryBlocks;
};
Expand Down
55 changes: 55 additions & 0 deletions catkit_core/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,58 @@ void Sleep(double sleep_time_in_sec, std::function<bool()> cancellation_callback
std::this_thread::sleep_for(std::chrono::duration<double>(this_sleep_time));
}
}

// MurmurHash3 32-bit version
uint32_t murmurhash3(std::string_view key, uint32_t seed)
{
const uint8_t *data = reinterpret_cast<const uint8_t *>(key.data());
size_t len = key.size();

uint32_t h = seed;
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;

// Partition in blocks of 4 bytes.
const size_t nblocks = len / 4;
const uint32_t* blocks = reinterpret_cast<const uint32_t *>(data);
for (size_t i = 0; i < nblocks; i++)
{
uint32_t k = blocks[i];
k *= c1;
k = (k << 15) | (k >> 17);
k *= c2;

h ^= k;
h = (h << 13) | (h >> 19);
h = h * 5 + 0xe6546b64;
}

// Process leftover bytes.
const uint8_t* tail = data + nblocks * 4;
uint32_t k1 = 0;

switch (len & 3)
{
case 3:
k1 ^= tail[2] << 16;
case 2:
k1 ^= tail[1] << 8;
case 1:
k1 ^= tail[0];
k1 *= c1;
k1 = (k1 << 15) | (k1 >> 17);
k1 *= c2;
h ^= k1;
case 0:
; // Do nothing.
}

h ^= len;
h ^= (h >> 16);
h *= 0x85ebca6b;
h ^= (h >> 13);
h *= 0xc2b2ae35;
h ^= (h >> 16);

return h;
}
5 changes: 5 additions & 0 deletions catkit_core/Util.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#ifndef UTIL_H
#define UTIL_H

#include <cstdint>
#include <string>
#include <string_view>
#include <functional>

int GetProcessId();
int GetThreadId();

// MurmurHash3 32-bit version for topic hashing
uint32_t murmurhash3(std::string_view key, uint32_t seed = 0);

template<typename ProtoClass>
std::string Serialize(const ProtoClass &obj);

Expand Down
Loading