diff --git a/catkit_core/HashMap.cpp b/catkit_core/HashMap.cpp index 7771a78a4..fb45caee3 100644 --- a/catkit_core/HashMap.cpp +++ b/catkit_core/HashMap.cpp @@ -1,4 +1,5 @@ #include "HashMap.h" +#include "Util.h" #include #include @@ -7,61 +8,6 @@ //#define DEBUG_PRINT(a) std::cout << a << std::endl #define DEBUG_PRINT(a) -// MurmurHash3 32-bit version -uint32_t murmurhash3(std::string_view key, uint32_t seed = 0) -{ - const uint8_t *data = reinterpret_cast(key.data()); - size_t len = key.size(); - - uint32_t h = seed; - const uint32_t c1 = 0xcc9e2d51; - const uint32_t c2 = 0x1b873593; - - // Partition in blocks of 4 bytes. - const size_t nblocks = len / 4; - const uint32_t* blocks = reinterpret_cast(data); - for (size_t i = 0; i < nblocks; i++) - { - uint32_t k = blocks[i]; - k *= c1; - k = (k << 15) | (k >> 17); - k *= c2; - - h ^= k; - h = (h << 13) | (h >> 19); - h = h * 5 + 0xe6546b64; - } - - // Process leftover bytes. - const uint8_t* tail = data + nblocks * 4; - uint32_t k1 = 0; - - switch (len & 3) - { - case 3: - k1 ^= tail[2] << 16; - case 2: - k1 ^= tail[1] << 8; - case 1: - k1 ^= tail[0]; - k1 *= c1; - k1 = (k1 << 15) | (k1 >> 17); - k1 *= c2; - h ^= k1; - case 0: - ; // Do nothing. - } - - h ^= len; - h ^= (h >> 16); - h *= 0x85ebca6b; - h ^= (h >> 13); - h *= 0xc2b2ae35; - h ^= (h >> 16); - - return h; -} - std::string to_hex(unsigned char c) { const static char hex[] = "0123456789abcdef"; diff --git a/catkit_core/LocalMessageBroker.cpp b/catkit_core/LocalMessageBroker.cpp index 6fab4c904..4f16a6357 100644 --- a/catkit_core/LocalMessageBroker.cpp +++ b/catkit_core/LocalMessageBroker.cpp @@ -145,7 +145,7 @@ LocalMessageBroker::LocalMessageBroker( MessageBrokerHeader *header, std::shared_ptr topic_headers, std::shared_ptr message_header_allocator, - std::shared_ptr event, + std::array, EVENT_POOL_SIZE> event_pool, std::vector> allocators, std::vector> memory_blocks, std::shared_ptr header_memory @@ -154,7 +154,7 @@ LocalMessageBroker::LocalMessageBroker( m_Header(header), m_TopicHeaders(std::move(topic_headers)), m_MessageHeaderAllocator(std::move(message_header_allocator)), - m_Event(std::move(event)), + m_EventPool(std::move(event_pool)), m_Allocators(std::move(allocators)), m_MemoryBlocks(memory_blocks), m_MessageHeaders(header->message_headers) @@ -184,8 +184,14 @@ std::shared_ptr LocalMessageBroker::Create(StructStream &str auto message_header_allocator = PoolAllocator::Create(stream, MAX_NUM_MESSAGES); - std::string id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation); - auto event = Event::Create(stream, id); + // Create event pool for topic-based notification + std::array, EVENT_POOL_SIZE> event_pool; + std::string base_id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation); + for (size_t i = 0; i < EVENT_POOL_SIZE; ++i) + { + std::string event_id = base_id + "_" + std::to_string(i); + event_pool[i] = Event::Create(stream, event_id); + } DEBUG_PRINT("Extracting allocators."); @@ -210,7 +216,7 @@ std::shared_ptr LocalMessageBroker::Create(StructStream &str header, std::move(topic_headers), std::move(message_header_allocator), - std::move(event), + std::move(event_pool), allocators, memory_blocks, stream.GetBuffer() @@ -225,7 +231,13 @@ std::shared_ptr LocalMessageBroker::Open(StructStream &strea auto topic_headers = HashMap::Open(stream); auto message_header_allocator = PoolAllocator::Open(stream); - auto event = Event::Open(stream); + + // Open event pool + std::array, EVENT_POOL_SIZE> event_pool; + for (size_t i = 0; i < EVENT_POOL_SIZE; ++i) + { + event_pool[i] = Event::Open(stream); + } std::vector> allocators; std::vector> memory_blocks; @@ -253,7 +265,7 @@ std::shared_ptr LocalMessageBroker::Open(StructStream &strea header, std::move(topic_headers), std::move(message_header_allocator), - std::move(event), + std::move(event_pool), allocators, memory_blocks, stream.GetBuffer() @@ -362,6 +374,10 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final) auto topic = std::string_view(message.m_Header->topic); auto allocator = GetAllocator(message.m_Header->payload_info.memory_block_id); + // Track which event indices need to be signaled (to avoid duplicates) + std::array signaled_events{}; + signaled_events.fill(false); + // Publish the message to all subtopics. for (const auto &subtopic : SubtopicRange(topic)) { @@ -369,6 +385,9 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final) DEBUG_PRINT("Publishing to subtopic \"" << subtopic << "\"."); + // Track this subtopic's event for signaling + signaled_events[topic_header->event_index] = true; + std::uint64_t first_id = topic_header->first_frame_id.load(std::memory_order_relaxed); std::uint64_t frame_id; @@ -448,7 +467,12 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final) break; } - m_Event->Signal(); + // Signal all unique events for the subtopics + for (std::size_t i = 0; i < EVENT_POOL_SIZE; ++i) + { + if (signaled_events[i]) + m_EventPool[i]->Signal(); + } // Deallocate the message header and payload. PoolAllocator::BlockHandle message_header_index = message.m_Header - m_MessageHeaders; @@ -556,8 +580,8 @@ std::optional LocalMessageBroker::GetNextMessage(std::string_view topic if (topic_header->IsMessageAvailable(new_frame_id)) return FetchMessage(topic_header, new_frame_id); - // Otherwise, wait for it. - m_Event->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check); + // Otherwise, wait for it on the specific event for this topic. + m_EventPool[topic_header->event_index]->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check); return FetchMessage(topic_header, new_frame_id); } @@ -670,6 +694,7 @@ TopicHeader *LocalMessageBroker::GetTopicHeader(std::string_view topic) temp_topic_header.first_frame_id = 0; temp_topic_header.last_frame_id = 0; temp_topic_header.frame_rate = 0.0; + temp_topic_header.event_index = murmurhash3(topic) % EVENT_POOL_SIZE; topic_header = (TopicHeader *) m_TopicHeaders->Insert(topic, &temp_topic_header); diff --git a/catkit_core/LocalMessageBroker.h b/catkit_core/LocalMessageBroker.h index 799c4ba43..1cd5d8bae 100644 --- a/catkit_core/LocalMessageBroker.h +++ b/catkit_core/LocalMessageBroker.h @@ -24,6 +24,7 @@ const size_t MAX_NUM_MESSAGES = 65536; const size_t MAX_NUM_BLOCKS = 8192; const size_t MEMORY_ALIGNMENT = 32; const size_t MIN_SIZE_POOL = 1024; +const size_t EVENT_POOL_SIZE = 128; struct TopicHeader { @@ -35,6 +36,9 @@ struct TopicHeader std::array message_headers; + // Pre-computed event index for this topic (hash of topic name % EVENT_POOL_SIZE) + std::uint32_t event_index; + bool IsMessageAvailable(std::size_t frame_id); bool WillMessageBeAvailable(std::size_t frame_id); std::size_t GetOldestMessageId(); @@ -65,7 +69,7 @@ class LocalMessageBroker : public Shareable, public MessageBroker MessageBrokerHeader *header, std::shared_ptr topic_headers, std::shared_ptr message_header_allocator, - std::shared_ptr event, + std::array, EVENT_POOL_SIZE> event_pool, std::vector> allocators, std::vector> memory_blocks, std::shared_ptr header_memory @@ -135,6 +139,8 @@ class LocalMessageBroker : public Shareable, public MessageBroker std::shared_ptr m_MessageHeaderAllocator; MessageHeader *m_MessageHeaders; + std::array, EVENT_POOL_SIZE> m_EventPool; + std::vector> m_Allocators; std::vector> m_MemoryBlocks; }; diff --git a/catkit_core/Util.cpp b/catkit_core/Util.cpp index e38e563d2..8548ac091 100644 --- a/catkit_core/Util.cpp +++ b/catkit_core/Util.cpp @@ -58,3 +58,58 @@ void Sleep(double sleep_time_in_sec, std::function cancellation_callback std::this_thread::sleep_for(std::chrono::duration(this_sleep_time)); } } + +// MurmurHash3 32-bit version +uint32_t murmurhash3(std::string_view key, uint32_t seed) +{ + const uint8_t *data = reinterpret_cast(key.data()); + size_t len = key.size(); + + uint32_t h = seed; + const uint32_t c1 = 0xcc9e2d51; + const uint32_t c2 = 0x1b873593; + + // Partition in blocks of 4 bytes. + const size_t nblocks = len / 4; + const uint32_t* blocks = reinterpret_cast(data); + for (size_t i = 0; i < nblocks; i++) + { + uint32_t k = blocks[i]; + k *= c1; + k = (k << 15) | (k >> 17); + k *= c2; + + h ^= k; + h = (h << 13) | (h >> 19); + h = h * 5 + 0xe6546b64; + } + + // Process leftover bytes. + const uint8_t* tail = data + nblocks * 4; + uint32_t k1 = 0; + + switch (len & 3) + { + case 3: + k1 ^= tail[2] << 16; + case 2: + k1 ^= tail[1] << 8; + case 1: + k1 ^= tail[0]; + k1 *= c1; + k1 = (k1 << 15) | (k1 >> 17); + k1 *= c2; + h ^= k1; + case 0: + ; // Do nothing. + } + + h ^= len; + h ^= (h >> 16); + h *= 0x85ebca6b; + h ^= (h >> 13); + h *= 0xc2b2ae35; + h ^= (h >> 16); + + return h; +} diff --git a/catkit_core/Util.h b/catkit_core/Util.h index 83cb71f4d..3a06d2361 100644 --- a/catkit_core/Util.h +++ b/catkit_core/Util.h @@ -1,12 +1,17 @@ #ifndef UTIL_H #define UTIL_H +#include #include +#include #include int GetProcessId(); int GetThreadId(); +// MurmurHash3 32-bit version for topic hashing +uint32_t murmurhash3(std::string_view key, uint32_t seed = 0); + template std::string Serialize(const ProtoClass &obj);