From c1194797361de38c5729c4245ecb4df79900ea60 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:06:02 +0900 Subject: [PATCH 1/9] Add slot classes. --- catkit_core/Slot.cpp | 250 ++++++++++++++++++++++++++++++++++++++ catkit_core/Slot.h | 219 +++++++++++++++++++++++++++++++++ catkit_core/SlotProxy.cpp | 143 ++++++++++++++++++++++ catkit_core/SlotProxy.h | 93 ++++++++++++++ 4 files changed, 705 insertions(+) create mode 100644 catkit_core/Slot.cpp create mode 100644 catkit_core/Slot.h create mode 100644 catkit_core/SlotProxy.cpp create mode 100644 catkit_core/SlotProxy.h diff --git a/catkit_core/Slot.cpp b/catkit_core/Slot.cpp new file mode 100644 index 000000000..778598f17 --- /dev/null +++ b/catkit_core/Slot.cpp @@ -0,0 +1,250 @@ +#include "Slot.h" +#include "Log.h" + +#include +#include +#include +#include + +using namespace std::literals::string_literals; + +// Read-only constructor +Slot::Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type) + : m_ServiceId(service_id) + , m_Broker(broker) + , m_Name(name) + , m_DataType(data_type) + , m_IsReadOnly(true) +{ +} + +// Read-write constructors +Slot::Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterJsonFunc setter) + : m_ServiceId(service_id) + , m_Broker(broker) + , m_Name(name) + , m_DataType(data_type) + , m_IsReadOnly(false) + , m_SetterJson(setter) + , m_SetterRaw(nullptr) + , m_SetterArray(nullptr) +{ +} + +Slot::Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterRawFunc setter) + : m_ServiceId(service_id) + , m_Broker(broker) + , m_Name(name) + , m_DataType(data_type) + , m_IsReadOnly(false) + , m_SetterJson(nullptr) + , m_SetterRaw(setter) + , m_SetterArray(nullptr) +{ +} + +Slot::Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterArrayFunc setter) + : m_ServiceId(service_id) + , m_Broker(broker) + , m_Name(name) + , m_DataType(data_type) + , m_IsReadOnly(false) + , m_SetterJson(nullptr) + , m_SetterRaw(nullptr) + , m_SetterArray(setter) +{ +} + +Slot::~Slot() +{ + Stop(); +} + +void Slot::CheckType(SlotDataType type) const +{ + if (type != m_DataType) + { + throw std::runtime_error("Type mismatch for slot '" + m_Name + "'"); + } +} + +Message Slot::Publish(const nlohmann::json &data) +{ + CheckType(SlotDataType::Json); + std::string json_str = data.dump(); + return m_Broker->PublishData(GetGetTopic(), json_str.data(), json_str.size()); +} + +Message Slot::Publish(std::string_view data) +{ + CheckType(SlotDataType::Raw); + return m_Broker->PublishData(GetGetTopic(), data.data(), data.size()); +} + +Message Slot::Publish(ArrayView data) +{ + CheckType(SlotDataType::Array); + return m_Broker->PublishArray(GetGetTopic(), data); +} + +bool Slot::IsReadOnly() const +{ + return m_IsReadOnly; +} + +SlotDataType Slot::GetDataType() const +{ + return m_DataType; +} + +const std::string &Slot::GetName() const +{ + return m_Name; +} + +void Slot::Start() +{ + if (m_IsReadOnly || m_SetMonitorThread) + return; + + m_ShouldStop = false; + m_SetMonitorThread = std::make_unique(&Slot::MonitorSetMessages, this); +} + +void Slot::Stop() +{ + m_ShouldStop = true; + if (m_SetMonitorThread && m_SetMonitorThread->joinable()) + { + m_SetMonitorThread->join(); + m_SetMonitorThread.reset(); + } +} + +void Slot::MonitorSetMessages() +{ + auto set_sub = m_Broker->Subscribe(GetSetTopic(), MessageSubscriptionMode::Sequential); + + // Subscribe to /cancel topic once - will be copied for each operation + auto cancel_sub = m_Broker->Subscribe(GetCancelTopic(), MessageSubscriptionMode::Sequential); + + while (!m_ShouldStop) + { + try + { + auto message = set_sub.GetNextMessage(0.1); + if (!message) + continue; + + auto payload = message->GetPayload(); + auto payload_size = payload.info.GetSizeInBytes(); + auto trace_id = message->GetTraceId(); + + // Create context with copy of cancel subscription + SlotContext context(m_Broker, m_ServiceId, m_Name, trace_id, cancel_sub); + + try + { + switch (m_DataType) + { + case SlotDataType::Json: + { + if (!payload.data || payload_size == 0) + { + LOG_ERROR("Slot " + m_Name + ": Received empty JSON payload"); + continue; + } + + auto data = (char *) payload.data; + auto json_data = nlohmann::json::parse(data, data + payload_size); + + m_SetterJson(json_data, context); + + break; + } + case SlotDataType::Raw: + { + std::string_view raw_data(static_cast(payload.data), payload_size); + + m_SetterRaw(raw_data, context); + + break; + } + case SlotDataType::Array: + { + m_SetterArray(payload, context); + + break; + } + } + } + catch (const std::exception& e) + { + std::string error_message = "Setter error: "s + e.what(); + LOG_ERROR("Slot " + m_Name + ": " + error_message); + m_Broker->PublishData(GetErrorTopic(), error_message.data(), error_message.size(), trace_id); + } + } + catch (const std::exception& e) + { + LOG_ERROR(std::string("Slot " + m_Name + ": Error in set monitor: " + e.what())); + } + } +} + +// SlotContext implementation + +SlotContext::SlotContext(std::shared_ptr broker, + const std::string &service_id, + const std::string &slot_name, + const Uuid &trace_id, + MessageSubscription cancel_sub) + : m_Broker(broker) + , m_ServiceId(service_id) + , m_SlotName(slot_name) + , m_TraceId(trace_id) + , m_GetTopic(service_id + "/" + slot_name + "/get") + , m_CancelSub(std::move(cancel_sub)) +{ +} + +bool SlotContext::IsCancelled() +{ + // Poll for cancel messages + while (auto msg = m_CancelSub.TryGetNextMessage()) + { + auto payload = msg->GetPayload(); + if (payload.info.GetSizeInBytes() != sizeof(Uuid)) + { + // Invalid cancel message format, skip + continue; + } + + // Parse trace_id from payload + Uuid cancel_trace_id; + std::memcpy(&cancel_trace_id, payload.data, sizeof(Uuid)); + + if (cancel_trace_id == m_TraceId) + { + return true; + } + } + + return false; +} + +Message SlotContext::Publish(const nlohmann::json &data) +{ + std::string json_str = data.dump(); + return m_Broker->PublishData(m_GetTopic, json_str.data(), json_str.size(), m_TraceId); +} + +Message SlotContext::Publish(std::string_view data) +{ + return m_Broker->PublishData(m_GetTopic, data.data(), data.size(), m_TraceId); +} + +Message SlotContext::Publish(ArrayView data) +{ + return m_Broker->PublishArray(m_GetTopic, data, m_TraceId); +} diff --git a/catkit_core/Slot.h b/catkit_core/Slot.h new file mode 100644 index 000000000..eee0883a1 --- /dev/null +++ b/catkit_core/Slot.h @@ -0,0 +1,219 @@ +#ifndef SLOT_H +#define SLOT_H + +#include "MessageBroker.h" +#include "Types.h" +#include "ArrayView.h" +#include "Uuid.h" + +#include + +#include +#include +#include +#include +#include +#include + +// Forward declaration +class SlotContext; + +// Type-specific setter functions using SlotContext +using SlotSetterJsonFunc = std::function; +using SlotSetterRawFunc = std::function; +using SlotSetterArrayFunc = std::function; + +/** + * SlotContext - Context provided to slot setters for publishing and cancellation. + * + * Provides: + * - Publishing to /get topic with trace ID preservation + * - Cancellation checking via /cancel topic subscription + * - Broker access for advanced operations + */ +class SlotContext +{ +public: + /** + * Create a slot context. + * + * @param broker The message broker + * @param service_id The parent service ID + * @param slot_name The slot name + * @param trace_id The trace ID from the incoming /set message + * @param cancel_sub The subscription to /cancel topic (copy of monitor's subscription) + */ + SlotContext(std::shared_ptr broker, + const std::string &service_id, + const std::string &slot_name, + const Uuid &trace_id, + MessageSubscription cancel_sub); + + /** + * Check if this operation has been cancelled. + * Polls the /cancel topic for messages matching this operation's trace ID. + * + * @return true if a /cancel message was received for this trace ID + */ + bool IsCancelled(); + + /** + * Publish a value to the slot's /get topic. + * Uses the same trace ID as the incoming /set message. + * + * @param data The JSON data to publish + * @return The published message + */ + Message Publish(const nlohmann::json &data); + + /** + * Publish raw data to the slot's /get topic. + * + * @param data The raw data to publish + * @return The published message + */ + Message Publish(std::string_view data); + + /** + * Publish array data to the slot's /get topic. + * + * @param data The array data to publish + * @return The published message + */ + Message Publish(ArrayView data); + + /** + * Get the message broker for advanced operations. + */ + std::shared_ptr GetBroker() const { return m_Broker; } + + /** + * Get the trace ID for this operation. + */ + const Uuid &GetTraceId() const { return m_TraceId; } + +private: + std::shared_ptr m_Broker; + std::string m_ServiceId; + std::string m_SlotName; + Uuid m_TraceId; + std::string m_GetTopic; + MessageSubscription m_CancelSub; +}; + +/** + * Slot - A typed communication channel between services. + * + * Supports: + * - Publishing data (service -> proxy) + * - Setting value with callback (proxy -> service -> callback) + * - Subscribing to stream (proxy <- service continuous) + */ +class Slot +{ +public: + /** + * Create a new read-only slot. + * + * @param service_id The parent service ID + * @param broker The message broker + * @param name The slot name (must be unique within service) + * @param data_type The data type (locked at creation) + */ + Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type); + + /** + * Create a new read-write slot with setter callback. + * + * @param service_id The parent service ID + * @param broker The message broker + * @param name The slot name (must be unique within service) + * @param data_type The data type (locked at creation) + * @param setter The callback function to call when value is set + */ + Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterJsonFunc setter); + Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterRawFunc setter); + Slot(const std::string &service_id, std::shared_ptr broker, const std::string &name, SlotDataType data_type, SlotSetterArrayFunc setter); + + /** + * Destructor. Cleans up the set monitor thread. + */ + ~Slot(); + + // Delete copy/move to ensure unique ownership + Slot(const Slot &) = delete; + Slot &operator=(const Slot &) = delete; + Slot(Slot &&) = delete; + Slot &operator=(Slot &&) = delete; + + /** + * Publish data to the slot. + * Broadcasts to all subscribers on /get topic. + * Type is validated against slot's data_type. + * + * @param data The data to publish + * @return The published message + * @throws std::runtime_error if type doesn't match slot data_type + */ + Message Publish(const nlohmann::json &data); + Message Publish(std::string_view data); + Message Publish(ArrayView data); + + /** + * Check if slot is read-only. + */ + bool IsReadOnly() const; + + /** + * Get the slot data type. + */ + SlotDataType GetDataType() const; + + /** + * Get the slot name. + */ + const std::string &GetName() const; + + /** + * Start the slot monitoring. + * Called by Service before Main(). + */ + void Start(); + + /** + * Stop the slot monitoring. + * Called by Service after Main(). + */ + void Stop(); + +private: + // Topic helpers + std::string GetGetTopic() const { return m_ServiceId + "/" + m_Name + "/get"; } + std::string GetSetTopic() const { return m_ServiceId + "/" + m_Name + "/set"; } + std::string GetCancelTopic() const { return m_ServiceId + "/" + m_Name + "/cancel"; } + std::string GetErrorTopic() const { return m_ServiceId + "/" + m_Name + "/error"; } + + // Implementation details + std::string m_ServiceId; + std::shared_ptr m_Broker; + std::string m_Name; + SlotDataType m_DataType; + bool m_IsReadOnly; + + // Setter callback (only valid if not read-only) + std::function m_SetterJson; + std::function m_SetterRaw; + std::function m_SetterArray; + + // Set monitoring thread + std::unique_ptr m_SetMonitorThread; + std::atomic m_ShouldStop{false}; + + // Helper to validate type + void CheckType(SlotDataType type) const; + + // Monitor set messages + void MonitorSetMessages(); +}; + +#endif // SLOT_H diff --git a/catkit_core/SlotProxy.cpp b/catkit_core/SlotProxy.cpp new file mode 100644 index 000000000..1464f06f8 --- /dev/null +++ b/catkit_core/SlotProxy.cpp @@ -0,0 +1,143 @@ +#include "SlotProxy.h" +#include "MessageBroker.h" + +#include + +SlotProxy::SlotProxy(std::shared_ptr broker, const std::string &service_id, const std::string &slot_name, bool read_only, SlotDataType data_type) + : m_Broker(broker) + , m_ServiceId(service_id) + , m_SlotName(slot_name) + , m_ReadOnly(read_only) + , m_DataType(data_type) + , m_GetTopic(service_id + "/" + slot_name + "/get") + , m_SetTopic(service_id + "/" + slot_name + "/set") +{ +} + +nlohmann::json SlotProxy::GetJson() const +{ + if (m_DataType != SlotDataType::Json) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Json slot"); + } + + auto msg = m_Broker->GetCurrentMessage(m_GetTopic); + + if (!msg.has_value()) + throw std::runtime_error("No data available for slot '" + m_SlotName + "'"); + + auto payload = msg->GetPayload(); + + if (!payload.data || payload.info.GetSizeInBytes() == 0) + throw std::runtime_error("Empty payload for slot '" + m_SlotName + "'"); + + char *data = (char *) payload.data; + + try + { + return nlohmann::json::parse(data, data + payload.info.GetSizeInBytes()); + } + catch (const nlohmann::json::exception& e) + { + throw std::runtime_error("JSON parse error for slot '" + m_SlotName + "': " + e.what()); + } +} + +std::string_view SlotProxy::GetRaw() const +{ + if (m_DataType != SlotDataType::Raw) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Raw slot"); + } + + auto msg = m_Broker->GetCurrentMessage(m_GetTopic); + + if (!msg.has_value()) + throw std::runtime_error("No data available for slot '" + m_SlotName + "'"); + + auto payload = msg->GetPayload(); + + return std::string_view(static_cast(payload.data), msg->GetPayloadSize()); +} + +ArrayView SlotProxy::GetArray() const +{ + if (m_DataType != SlotDataType::Array) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not an Array slot"); + } + + auto msg = m_Broker->GetCurrentMessage(m_GetTopic); + + if (!msg.has_value()) + throw std::runtime_error("No data available for slot '" + m_SlotName + "'"); + + return msg->GetPayload(); +} + +void SlotProxy::Set(const nlohmann::json &data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Json) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Json slot"); + } + + auto json_str = data.dump(); + + m_Broker->PublishData(m_SetTopic, json_str.data(), json_str.size()); +} + +void SlotProxy::Set(std::string_view data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Raw) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Raw slot"); + } + + m_Broker->PublishData(m_SetTopic, data.data(), data.size()); +} + +void SlotProxy::Set(ArrayView data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Array) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not an Array slot"); + } + + m_Broker->PublishArray(m_SetTopic, data); +} + +MessageSubscription SlotProxy::Subscribe(MessageSubscriptionMode mode) +{ + return m_Broker->Subscribe(m_GetTopic, mode); +} + +bool SlotProxy::IsReadOnly() const +{ + return m_ReadOnly; +} + +const std::string &SlotProxy::GetSlotName() const +{ + return m_SlotName; +} + +SlotDataType SlotProxy::GetDataType() const +{ + return m_DataType; +} diff --git a/catkit_core/SlotProxy.h b/catkit_core/SlotProxy.h new file mode 100644 index 000000000..e8c64a345 --- /dev/null +++ b/catkit_core/SlotProxy.h @@ -0,0 +1,93 @@ +#ifndef SLOT_PROXY_H +#define SLOT_PROXY_H + +#include "MessageBroker.h" +#include "Types.h" +#include "ArrayView.h" + +#include + +#include +#include +#include + +class TestbedProxy; + +/** + * SlotProxy - Client-side proxy for accessing a remote Slot. + * + * Provides: + * - Getting latest value from /get topic (with type validation) + * - Setting value via /set topic (with type validation) + * - Subscribing to /get topic for streaming + */ +class SlotProxy +{ +public: + /** + * Create a new slot proxy. + * + * @param broker The message broker + * @param service_id The service ID + * @param slot_name The slot name + * @param read_only Whether the slot is read-only + * @param data_type The slot data type for validation + */ + SlotProxy(std::shared_ptr broker, const std::string &service_id, const std::string &slot_name, bool read_only, SlotDataType data_type); + + /** + * Get the latest value from the slot. + * + * @return The latest published data + * @throws std::runtime_error on timeout or error + */ + nlohmann::json GetJson() const; + std::string_view GetRaw() const; + ArrayView GetArray() const; + + /** + * Set the slot value. + * Publishes to /set topic, triggers service setter callback. + * Type is validated against slot's data_type. + * + * @param data The data to set + * @throws std::runtime_error if slot is read-only or type mismatch + */ + void Set(const nlohmann::json &data); + void Set(std::string_view data); + void Set(ArrayView data); + + /** + * Subscribe to slot updates. + * + * @param mode Subscription mode (NewestOnly or Sequential) + * @return MessageSubscription for receiving updates + */ + MessageSubscription Subscribe(MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly); + + /** + * Check if slot is read-only. + */ + bool IsReadOnly() const; + + /** + * Get the slot name. + */ + const std::string &GetSlotName() const; + + /** + * Get the slot data type. + */ + SlotDataType GetDataType() const; + +private: + std::shared_ptr m_Broker; + std::string m_ServiceId; + std::string m_SlotName; + bool m_ReadOnly; + SlotDataType m_DataType; + std::string m_GetTopic; + std::string m_SetTopic; +}; + +#endif // SLOT_PROXY_H From aa0b1bef84c633c3fa6a6bd6917ab8c732d1523c Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:06:27 +0900 Subject: [PATCH 2/9] Integrate slots into the Service and ServiceProxy classes. --- catkit_core/Service.cpp | 82 ++++++++++++++++++++++++++++++++++++ catkit_core/Service.h | 9 ++++ catkit_core/ServiceProxy.cpp | 55 ++++++++++++++++++++++++ catkit_core/ServiceProxy.h | 10 +++++ 4 files changed, 156 insertions(+) diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index 6b90aace8..f455db890 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -186,6 +186,12 @@ void Service::Run(void (*error_check)()) m_IsRunning = true; UpdateState(ServiceState::RUNNING); + LOG_INFO("Starting slots."); + for (auto& [name, slot] : m_Slots) + { + slot->Start(); + } + LOG_INFO("Starting service main function."); // Start the main function. @@ -202,6 +208,12 @@ void Service::Run(void (*error_check)()) crashed = true; } + + LOG_INFO("Stopping slots."); + for (auto& [name, slot] : m_Slots) + { + slot->Stop(); + } } m_IsRunning = false; @@ -253,6 +265,7 @@ void Service::CleanupAttributes() m_Properties.clear(); m_Commands.clear(); m_DataStreams.clear(); + m_Slots.clear(); } void Service::MonitorSafety() @@ -593,6 +606,66 @@ std::shared_ptr Service::ReuseDataStream(std::string stream_name, st return stream; } +std::shared_ptr Service::MakeJsonSlot(std::string slot_name) +{ + LOG_DEBUG("Making JSON slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Json); + m_Slots[slot_name] = slot; + + return slot; +} + +std::shared_ptr Service::MakeJsonSlot(std::string slot_name, SlotSetterJsonFunc setter) +{ + LOG_DEBUG("Making writable JSON slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Json, setter); + m_Slots[slot_name] = slot; + + return slot; +} + +std::shared_ptr Service::MakeRawSlot(std::string slot_name) +{ + LOG_DEBUG("Making raw slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Raw); + m_Slots[slot_name] = slot; + + return slot; +} + +std::shared_ptr Service::MakeRawSlot(std::string slot_name, SlotSetterRawFunc setter) +{ + LOG_DEBUG("Making writable raw slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Raw, setter); + m_Slots[slot_name] = slot; + + return slot; +} + +std::shared_ptr Service::MakeArraySlot(std::string slot_name) +{ + LOG_DEBUG("Making array slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Array); + m_Slots[slot_name] = slot; + + return slot; +} + +std::shared_ptr Service::MakeArraySlot(std::string slot_name, SlotSetterArrayFunc setter) +{ + LOG_DEBUG("Making writable array slot \"" + slot_name + "\"."); + + std::shared_ptr slot = std::make_shared(GetId(), m_Testbed->GetMessageBroker(), slot_name, SlotDataType::Array, setter); + m_Slots[slot_name] = slot; + + return slot; +} + std::shared_ptr Service::GetTestbed() { return m_Testbed; @@ -652,6 +725,7 @@ string Service::GetInfo() {"property_names", json::array()}, {"command_names", json::array()}, {"datastream_ids", json::object()}, + {"slots", json::object()}, {"heartbeat_stream_id", m_Heartbeat->GetStreamId()} }; @@ -664,6 +738,14 @@ string Service::GetInfo() for (auto& [key, value] : m_DataStreams) reply["datastream_ids"][key] = value->GetStreamId(); + for (auto& [key, value] : m_Slots) + { + reply["slots"][key] = { + {"data_type", value->GetDataType()}, + {"read_only", value->IsReadOnly()} + }; + } + return reply.dump(); } diff --git a/catkit_core/Service.h b/catkit_core/Service.h index 0aab3d13d..5719aadf9 100644 --- a/catkit_core/Service.h +++ b/catkit_core/Service.h @@ -22,6 +22,7 @@ #include "ProcessStats.h" #include "Types.h" #include "MessageBroker.h" +#include "Slot.h" const double SERVICE_LIVELINESS = 5; @@ -60,6 +61,13 @@ class Service std::shared_ptr MakeDataStream(std::string stream_name, DataType type, std::vector dimensions, size_t num_frames_in_buffer); std::shared_ptr ReuseDataStream(std::string stream_name, std::string stream_id); + std::shared_ptr MakeJsonSlot(std::string slot_name); + std::shared_ptr MakeJsonSlot(std::string slot_name, SlotSetterJsonFunc setter); + std::shared_ptr MakeRawSlot(std::string slot_name); + std::shared_ptr MakeRawSlot(std::string slot_name, SlotSetterRawFunc setter); + std::shared_ptr MakeArraySlot(std::string slot_name); + std::shared_ptr MakeArraySlot(std::string slot_name, SlotSetterArrayFunc setter); + std::shared_ptr GetTestbed(); void CleanupAttributes(); @@ -105,6 +113,7 @@ class Service std::map> m_Commands; std::map> m_DataStreams; + std::map> m_Slots; LogConsole m_LoggerConsole; LogForwarder m_LoggerPublish; diff --git a/catkit_core/ServiceProxy.cpp b/catkit_core/ServiceProxy.cpp index e73c709b8..bdc65923f 100644 --- a/catkit_core/ServiceProxy.cpp +++ b/catkit_core/ServiceProxy.cpp @@ -218,6 +218,33 @@ std::shared_ptr ServiceProxy::GetDataStream(const std::string &name, return m_DataStreams[name]; } +std::shared_ptr ServiceProxy::GetSlot(const std::string &name, void (*error_check)()) +{ + // Start the service if it has not already been started. + Start(TIMEOUT_TO_START, error_check); + + // Check if the name is a valid slot name. + auto slot_info = m_SlotInfo.find(name); + if (slot_info == m_SlotInfo.end()) + throw std::runtime_error("This is not a valid slot name."); + + auto slot = m_Slots.find(name); + + // Check if we already created this slot proxy. + if (slot == m_Slots.end()) + { + // Create it now using info from service. + m_Slots[name] = std::make_shared( + m_Testbed->GetMessageBroker(), + m_ServiceId, + name, + slot_info->second.read_only, + slot_info->second.data_type); + } + + return m_Slots[name]; +} + std::shared_ptr ServiceProxy::GetHeartbeat() { return m_Heartbeat; @@ -379,6 +406,19 @@ void ServiceProxy::Connect() for (auto& [key, value] : info["datastream_ids"].items()) m_DataStreamIds[key] = value; + // Parse slot info + m_SlotInfo.clear(); + if (info.contains("slots")) + { + for (auto& [key, value] : info["slots"].items()) + { + m_SlotInfo[key] = { + value["data_type"].get(), + value["read_only"].get() + }; + } + } + m_Heartbeat = DataStream::Open(info["heartbeat_stream_id"].get()); m_TimeLastConnect = frame.m_TimeStamp; @@ -391,7 +431,9 @@ void ServiceProxy::Disconnect() m_PropertyNames.clear(); m_CommandNames.clear(); m_DataStreamIds.clear(); + m_SlotInfo.clear(); m_DataStreams.clear(); + m_Slots.clear(); m_Heartbeat = nullptr; } @@ -425,6 +467,19 @@ std::vector ServiceProxy::GetDataStreamNames(void (*error_check)()) return names; } +std::vector ServiceProxy::GetSlotNames(void (*error_check)()) +{ + // Start the service if it has not already been started. + Start(TIMEOUT_TO_START, error_check); + + std::vector names; + + for (auto const &item : m_SlotInfo) + names.push_back(item.first); + + return names; +} + nlohmann::json ServiceProxy::GetConfig() { return m_Testbed->GetConfig()["services"][m_ServiceId]; diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index a7c7154f7..64b6f8cfa 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -5,6 +5,7 @@ #include "DataStream.h" #include "ServiceState.h" #include "Client.h" +#include "SlotProxy.h" #include #include @@ -27,6 +28,8 @@ class ServiceProxy Value ExecuteCommand(const std::string &name, const Dict &arguments, void (*error_check)() = nullptr); std::shared_ptr GetDataStream(const std::string &name, void (*error_check)() = nullptr); + std::shared_ptr GetSlot(const std::string &name, void (*error_check)() = nullptr); + std::vector GetSlotNames(void (*error_check)() = nullptr); std::shared_ptr GetHeartbeat(); @@ -60,7 +63,14 @@ class ServiceProxy std::vector m_CommandNames; std::map m_DataStreamIds; + struct SlotInfo { + SlotDataType data_type; + bool read_only; + }; + std::map m_SlotInfo; + std::map> m_DataStreams; + std::map> m_Slots; std::shared_ptr m_Heartbeat; std::shared_ptr m_State; From 36493184ac49074913abfa7068fe1f737b134450 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:06:48 +0900 Subject: [PATCH 3/9] Add slots to Python bindings. --- catkit2/bindings.cpp | 216 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 215 insertions(+), 1 deletion(-) diff --git a/catkit2/bindings.cpp b/catkit2/bindings.cpp index 46c014043..58b00776c 100644 --- a/catkit2/bindings.cpp +++ b/catkit2/bindings.cpp @@ -36,6 +36,8 @@ #include "Uuid.h" #include "ArrayView.h" #include "ProcessStats.h" +#include "Slot.h" +#include "SlotProxy.h" #include "testbed.pb.h" @@ -607,6 +609,45 @@ PYBIND11_MODULE(catkit_bindings, m) return service.MakeDataStream(stream_name, dtype, dimensions, num_frames_in_buffer); }) .def("reuse_data_stream", &Service::ReuseDataStream) + .def("make_json_slot", [](Service &service, std::string slot_name, py::object setter) + { + if (setter.is_none()) + { + return service.MakeJsonSlot(slot_name); + } + return service.MakeJsonSlot(slot_name, + [setter](const nlohmann::json &value, SlotContext &ctx) + { + py::gil_scoped_acquire acquire; + py::object result = setter(value, ctx); + }); + }, py::arg("slot_name"), py::arg("setter") = py::none()) + .def("make_raw_slot", [](Service &service, std::string slot_name, py::object setter) + { + if (setter.is_none()) + { + return service.MakeRawSlot(slot_name); + } + return service.MakeRawSlot(slot_name, + [setter](std::string_view value, SlotContext &ctx) + { + py::gil_scoped_acquire acquire; + py::object result = setter(py::bytes(value.data(), value.size()), ctx); + }); + }, py::arg("slot_name"), py::arg("setter") = py::none()) + .def("make_array_slot", [](Service &service, std::string slot_name, py::object setter) + { + if (setter.is_none()) + { + return service.MakeArraySlot(slot_name); + } + return service.MakeArraySlot(slot_name, + [setter](ArrayView value, SlotContext &ctx) + { + py::gil_scoped_acquire acquire; + py::object result = setter(ToPython(value), ctx); + }); + }, py::arg("slot_name"), py::arg("setter") = py::none()) .def("cleanup_attributes", &Service::CleanupAttributes); py::enum_(m, "ServiceState") @@ -664,9 +705,17 @@ PYBIND11_MODULE(catkit_bindings, m) { return service.GetDataStreamNames(error_check_python); }) + .def_property_readonly("slot_names", [](ServiceProxy &service) + { + return service.GetSlotNames(error_check_python); + }) .def_property_readonly("config", &ServiceProxy::GetConfig) .def_property_readonly("id", &ServiceProxy::GetId) - .def_property_readonly("testbed", &ServiceProxy::GetTestbed); + .def_property_readonly("testbed", &ServiceProxy::GetTestbed) + .def("get_slot", [](ServiceProxy &proxy, std::string name) + { + return proxy.GetSlot(name, error_check_python); + }); py::class_>(m, "TestbedProxy") .def(py::init()) @@ -1292,6 +1341,171 @@ PYBIND11_MODULE(catkit_bindings, m) .def_property_readonly("memory_usage", &ProcessStats::GetMemoryUsage) .def_property_readonly("cpu_usage", &ProcessStats::GetCpuUsage); + // Slot bindings + py::enum_(m, "SlotDataType") + .value("Json", SlotDataType::Json) + .value("Raw", SlotDataType::Raw) + .value("Array", SlotDataType::Array); + + py::class_(m, "SlotContext") + .def("is_cancelled", &SlotContext::IsCancelled) + .def("publish_json", [](SlotContext &ctx, nlohmann::json data) + { + ctx.Publish(data); + }) + .def("publish_raw", [](SlotContext &ctx, py::bytes data) + { + ctx.Publish(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()))); + }) + .def("publish_array", [](SlotContext &ctx, py::array array) + { + ArrayInfo info; + auto dtype = array.dtype(); + info.data_type = dtype.kind(); + info.item_size = dtype.itemsize(); + info.byte_order = dtype.byteorder(); + info.ndim = array.ndim(); + for (size_t i = 0; i < info.ndim; ++i) + { + info.shape[i] = array.shape()[i]; + info.strides[i] = array.strides()[i]; + } + ArrayView view{info, array.mutable_data()}; + ctx.Publish(view); + }) + .def_property_readonly("broker", &SlotContext::GetBroker) + .def_property_readonly("trace_id", &SlotContext::GetTraceId); + + py::class_>(m, "Slot") + .def_property_readonly("is_read_only", &Slot::IsReadOnly) + .def_property_readonly("data_type", &Slot::GetDataType) + .def_property_readonly("name", &Slot::GetName) + .def("start", &Slot::Start) + .def("stop", &Slot::Stop) + .def("publish", [](Slot &slot, py::object value) + { + switch (slot.GetDataType()) + { + case SlotDataType::Json: + slot.Publish(py::cast(value)); + break; + case SlotDataType::Raw: + { + py::bytes data = py::cast(value); + slot.Publish(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()))); + break; + } + case SlotDataType::Array: + { + py::array array = py::cast(value); + ArrayInfo info; + auto dtype = array.dtype(); + info.data_type = dtype.kind(); + info.item_size = dtype.itemsize(); + info.byte_order = dtype.byteorder(); + + if (array.ndim() > MAX_NUM_DIMENSIONS) + throw std::runtime_error("Array dimension is too large."); + + info.ndim = array.ndim(); + + for (size_t i = 0; i < info.ndim; ++i) + { + info.shape[i] = array.shape()[i]; + info.strides[i] = array.strides()[i]; + } + + if (!info.IsCContiguous() && !info.IsFContiguous()) + throw std::runtime_error("Array has to be either C or F contiguous."); + + const ArrayView array_view{info, array.mutable_data()}; + slot.Publish(array_view); + break; + } + default: + throw std::runtime_error("Unknown slot data type"); + } + }); + + py::class_>(m, "SlotProxy") + .def("subscribe", &SlotProxy::Subscribe, py::arg("mode") = MessageSubscriptionMode::NewestOnly) + .def_property_readonly("is_read_only", &SlotProxy::IsReadOnly) + .def_property_readonly("data_type", &SlotProxy::GetDataType) + .def_property_readonly("slot_name", &SlotProxy::GetSlotName) + .def("get", [](SlotProxy &proxy) -> py::object + { + try + { + switch (proxy.GetDataType()) + { + case SlotDataType::Json: + return proxy.GetJson(); + case SlotDataType::Raw: + { + auto raw = proxy.GetRaw(); + return py::bytes(raw.data(), raw.size()); + } + case SlotDataType::Array: + return ToPython(proxy.GetArray()); + default: + throw std::runtime_error("Unknown slot data type"); + } + } + catch (const std::runtime_error &) + { + return py::none(); + } + }) + .def("set", [](SlotProxy &proxy, py::object value) + { + if (proxy.IsReadOnly()) + { + throw std::runtime_error("Cannot set read-only slot"); + } + + switch (proxy.GetDataType()) + { + case SlotDataType::Json: + proxy.Set(py::cast(value)); + break; + case SlotDataType::Raw: + { + py::bytes data = py::cast(value); + proxy.Set(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()))); + break; + } + case SlotDataType::Array: + { + py::array array = py::cast(value); + ArrayInfo info; + auto dtype = array.dtype(); + info.data_type = dtype.kind(); + info.item_size = dtype.itemsize(); + info.byte_order = dtype.byteorder(); + + if (array.ndim() > MAX_NUM_DIMENSIONS) + throw std::runtime_error("Array dimension is too large."); + + info.ndim = array.ndim(); + + for (size_t i = 0; i < info.ndim; ++i) + { + info.shape[i] = array.shape()[i]; + info.strides[i] = array.strides()[i]; + } + + if (!info.IsCContiguous() && !info.IsFContiguous()) + throw std::runtime_error("Array has to be either C or F contiguous."); + + const ArrayView array_view{info, array.mutable_data()}; + proxy.Set(array_view); + break; + } + default: + throw std::runtime_error("Unknown slot data type"); + } + }); + #ifdef VERSION_INFO m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); #else From 87b1bb4cf8eb300c17a6846f0b76664a6450c926 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:06:54 +0900 Subject: [PATCH 4/9] Add slot data types. --- catkit_core/Types.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/catkit_core/Types.h b/catkit_core/Types.h index 1480df555..cb6ef32f9 100644 --- a/catkit_core/Types.h +++ b/catkit_core/Types.h @@ -35,6 +35,13 @@ class Dict : public std::map { }; +enum class SlotDataType +{ + Json, + Raw, + Array +}; + void ToProto(const Value &value, catkit_proto::Value *proto_value); void ToProto(const List &list, catkit_proto::List *proto_list); void ToProto(const Dict &dict, catkit_proto::Dict *proto_dict); From 84e430e111ffafe27bf6386aa67edcb9e595e0bf Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:11:39 +0900 Subject: [PATCH 5/9] Allow access to slots via attributes. --- catkit2/testbed/service_proxy.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/catkit2/testbed/service_proxy.py b/catkit2/testbed/service_proxy.py index 7da25aef9..24b02e742 100644 --- a/catkit2/testbed/service_proxy.py +++ b/catkit2/testbed/service_proxy.py @@ -31,7 +31,7 @@ def testbed(self): return self._testbed def __getattr__(self, name): - '''Get a property, command or data stream. + '''Get a property, command, data stream, or slot. Properties ---------- @@ -40,13 +40,13 @@ def __getattr__(self, name): Returns ------- - Property or Command or DataStream object + Property or Command or DataStream or SlotProxy object The attribute. Raises ------ AttributeError - If the named attribute is not a property, command or data stream. + If the named attribute is not a property, command, data stream, or slot. ''' if name in self.property_names: # Return property. @@ -59,11 +59,14 @@ def cmd(**kwargs): elif name in self.data_stream_names: # Return datastream. return self.get_data_stream(name) + elif name in self.slot_names: + # Return slot. + return self.get_slot(name) else: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'.") def __setattr__(self, name, value): - '''Set the property. + '''Set the property or slot. Parameters ---------- @@ -76,6 +79,7 @@ def __setattr__(self, name, value): ------ AttributeError If the attribute is a command or datastream (both of which are not settable). + If the attribute is a read-only slot. If the attribute cannot be found on this service. ''' if name in self.property_names: @@ -85,6 +89,9 @@ def __setattr__(self, name, value): raise AttributeError('Cannot set a command.') elif name in self.data_stream_names: raise AttributeError('Cannot set a data stream. Did you mean .submit_data()?') + elif name in self.slot_names: + # Set slot value if not read-only. + self.get_slot(name).set(value) else: super().__setattr__(name, value) From 97d4ec19113634d5cc5b1fc637a8559ee6acf898 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 27 Feb 2026 22:12:59 +0900 Subject: [PATCH 6/9] Add tests for slots. --- tests/services/dummy_service/dummy_service.py | 70 ++++++++++ tests/test_service.py | 126 ++++++++++++++++++ 2 files changed, 196 insertions(+) diff --git a/tests/services/dummy_service/dummy_service.py b/tests/services/dummy_service/dummy_service.py index d12b942e0..f4eaaabc9 100644 --- a/tests/services/dummy_service/dummy_service.py +++ b/tests/services/dummy_service/dummy_service.py @@ -10,6 +10,12 @@ def __init__(self): self.readonly_property = self.config['readonly_property'] self.readwrite_property = 1 + + # Storage for slot values + self.json_slot_value = {"initial": True} + self.raw_slot_value = b"initial" + self.array_slot_value = np.zeros((N, N), dtype='float64') + self.writable_json_slot_value = {"initialized": True} def open(self): self.make_property('readonly_property', self.get_readonly) @@ -17,10 +23,49 @@ def open(self): self.make_command('add', self.add) self.make_command('push_on_stream', self.push_on_stream) + self.make_command('publish_json_slot', self.publish_json_slot) + self.make_command('publish_raw_slot', self.publish_raw_slot) + self.make_command('publish_array_slot', self.publish_array_slot) self.stream = self.make_data_stream('stream', 'float64', [N, N], 20) self.push_on_stream() + # Create read-write slots with setters + self.json_slot = self.make_json_slot( + 'json_slot', + self.on_json_slot_set + ) + self.raw_slot = self.make_raw_slot( + 'raw_slot', + self.on_raw_slot_set + ) + self.array_slot = self.make_array_slot( + 'array_slot', + self.on_array_slot_set + ) + + # Create a read-only raw slot (no setter) + self.readonly_raw_slot = self.make_raw_slot('readonly_raw_slot') + + def on_json_slot_set(self, value, context): + """Setter callback for json_slot.""" + self.json_slot_value = value + # Confirm the set by publishing back + context.publish_json(value) + + def on_raw_slot_set(self, value, context): + """Setter callback for raw_slot.""" + self.raw_slot_value = bytes(value) + # Confirm the set by publishing back + context.publish_raw(value) + + def on_array_slot_set(self, value, context): + """Setter callback for array_slot.""" + # value is a numpy array + self.array_slot_value = value.copy() + # Confirm the set by publishing back + context.publish_array(value) + def main(self): while not self.should_shut_down: self.sleep(0.1) @@ -44,6 +89,31 @@ def push_on_stream(self): arr = np.random.randn(N, N).astype('float64') self.stream.submit_data(arr) + def publish_json_slot(self, data): + """Publish JSON data to json_slot.""" + self.json_slot.publish(data) + return True + + def publish_raw_slot(self, data): + """Publish raw bytes to raw_slot.""" + # data comes as a string from JSON, convert to bytes + if isinstance(data, str): + data = data.encode('utf-8') + elif isinstance(data, dict) and 'bytes' in data: + # Handle case where bytes are passed specially + data = data['bytes'].encode('utf-8') if isinstance(data['bytes'], str) else bytes(data['bytes']) + self.raw_slot.publish(data) + return True + + def publish_array_slot(self, data=None): + """Publish numpy array to array_slot.""" + if data is None: + arr = np.random.randn(N, N).astype('float64') + else: + arr = np.array(data).astype('float64') + self.array_slot.publish(arr) + return True + if __name__ == '__main__': service = DummyService() service.run() diff --git a/tests/test_service.py b/tests/test_service.py index 79e3555b0..c2b3f64e9 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1,4 +1,5 @@ import pytest +import numpy as np def test_service_property(dummy_service): # We should be able to read and write to a property. @@ -30,3 +31,128 @@ def test_service_datastream(dummy_service): after_id = dummy_service.stream.get_latest_frame().id assert after_id == before_id + 1 + + +def test_service_slot_json_readwrite(dummy_service): + """Test read-write JSON slot: service publishes, proxy reads and writes.""" + slot = dummy_service.get_slot('json_slot') + + # Should be writable + assert slot.is_read_only is False + + # Initially should return None + assert slot.get() is None + + # Service publishes data via command + test_data = {"test": "value", "number": 42} + dummy_service.publish_json_slot(data=test_data) + + # Client should be able to read it + result = slot.get() + assert result is not None + assert result["test"] == "value" + assert result["number"] == 42 + + # Client can also write + client_data = {"from": "client", "value": 100} + slot.set(client_data) + + import time + time.sleep(0.05) + + # Should be able to read back the confirmed value + result = slot.get() + assert result is not None + assert result["from"] == "client" + assert result["value"] == 100 + + +def test_service_slot_raw_readwrite(dummy_service): + """Test read-write raw slot: service publishes, proxy reads and writes.""" + slot = dummy_service.get_slot('raw_slot') + + # Should be writable + assert slot.is_read_only is False + + # Initially should return None + assert slot.get() is None + + # Service publishes data via command + test_data = "hello world" + dummy_service.publish_raw_slot(data=test_data) + + # Client should be able to read it + result = slot.get() + assert result is not None + assert result.decode('utf-8') == test_data + + # Client can also write + client_data = b"from client" + slot.set(client_data) + + import time + time.sleep(0.05) + + # Should be able to read back the confirmed value + result = slot.get() + assert result is not None + assert result == client_data + + +def test_service_slot_array_readwrite(dummy_service): + """Test read-write array slot: service publishes, proxy reads and writes.""" + slot = dummy_service.get_slot('array_slot') + + # Should be writable + assert slot.is_read_only is False + + # Initially should return None + assert slot.get() is None + + # Service publishes data via command + test_array = np.random.randn(16, 16).astype('float64').tolist() + dummy_service.publish_array_slot(data=test_array) + + # Client should be able to read it + result = slot.get() + assert result is not None + expected = np.array(test_array).astype('float64') + assert np.array_equal(result, expected) + + # Client can also write + client_array = np.ones((16, 16), dtype='float64') + slot.set(client_array) + + import time + time.sleep(0.05) + + # Should be able to read back the confirmed value + result = slot.get() + assert result is not None + assert np.array_equal(result, client_array) + + +def test_service_slot_subscription(dummy_service): + """Test slot subscription for streaming updates.""" + slot = dummy_service.get_slot('json_slot') + subscription = slot.subscribe() + + # Publish data via command + test_data = {"seq": 1} + dummy_service.publish_json_slot(data=test_data) + + # Should receive the message + msg = subscription.try_get_next_message() + assert msg is not None + + +def test_service_slot_readonly_raw(dummy_service): + """Test that readonly_raw_slot is read-only from proxy.""" + slot = dummy_service.get_slot('readonly_raw_slot') + + # The slot should be read-only + assert slot.is_read_only + + # Attempting to set should raise an error + with pytest.raises(RuntimeError, match="read-only"): + slot.set(b"test data") From 7a4490623cbcfda17d1605f10bb7bb3f48316ca0 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sat, 28 Feb 2026 08:21:43 +0900 Subject: [PATCH 7/9] Implement waiting for confirmation with timeout. --- catkit2/bindings.cpp | 54 ++++++++++++- catkit_core/SlotProxy.cpp | 154 ++++++++++++++++++++++++++++++++++++-- catkit_core/SlotProxy.h | 38 +++++++++- 3 files changed, 232 insertions(+), 14 deletions(-) diff --git a/catkit2/bindings.cpp b/catkit2/bindings.cpp index 58b00776c..69faed173 100644 --- a/catkit2/bindings.cpp +++ b/catkit2/bindings.cpp @@ -1456,7 +1456,7 @@ PYBIND11_MODULE(catkit_bindings, m) return py::none(); } }) - .def("set", [](SlotProxy &proxy, py::object value) + .def("set", [](SlotProxy &proxy, py::object value, double timeout) { if (proxy.IsReadOnly()) { @@ -1466,12 +1466,12 @@ PYBIND11_MODULE(catkit_bindings, m) switch (proxy.GetDataType()) { case SlotDataType::Json: - proxy.Set(py::cast(value)); + proxy.Set(py::cast(value), timeout); break; case SlotDataType::Raw: { py::bytes data = py::cast(value); - proxy.Set(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()))); + proxy.Set(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr())), timeout); break; } case SlotDataType::Array: @@ -1498,12 +1498,58 @@ PYBIND11_MODULE(catkit_bindings, m) throw std::runtime_error("Array has to be either C or F contiguous."); const ArrayView array_view{info, array.mutable_data()}; - proxy.Set(array_view); + proxy.Set(array_view, timeout); break; } default: throw std::runtime_error("Unknown slot data type"); } + }, py::arg("value"), py::arg("timeout") = 5.0) + .def("set_async", [](SlotProxy &proxy, py::object value) -> Uuid + { + if (proxy.IsReadOnly()) + { + throw std::runtime_error("Cannot set read-only slot"); + } + + switch (proxy.GetDataType()) + { + case SlotDataType::Json: + return proxy.SetAsync(py::cast(value)); + case SlotDataType::Raw: + { + py::bytes data = py::cast(value); + return proxy.SetAsync(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()))); + } + case SlotDataType::Array: + { + py::array array = py::cast(value); + ArrayInfo info; + auto dtype = array.dtype(); + info.data_type = dtype.kind(); + info.item_size = dtype.itemsize(); + info.byte_order = dtype.byteorder(); + + if (array.ndim() > MAX_NUM_DIMENSIONS) + throw std::runtime_error("Array dimension is too large."); + + info.ndim = array.ndim(); + + for (size_t i = 0; i < info.ndim; ++i) + { + info.shape[i] = array.shape()[i]; + info.strides[i] = array.strides()[i]; + } + + if (!info.IsCContiguous() && !info.IsFContiguous()) + throw std::runtime_error("Array has to be either C or F contiguous."); + + const ArrayView array_view{info, array.mutable_data()}; + return proxy.SetAsync(array_view); + } + default: + throw std::runtime_error("Unknown slot data type"); + } }); #ifdef VERSION_INFO diff --git a/catkit_core/SlotProxy.cpp b/catkit_core/SlotProxy.cpp index 1464f06f8..902250376 100644 --- a/catkit_core/SlotProxy.cpp +++ b/catkit_core/SlotProxy.cpp @@ -1,7 +1,26 @@ #include "SlotProxy.h" #include "MessageBroker.h" +#include "Timing.h" #include +#include +#include + +// Custom exception classes +class SlotTimeoutError : public std::runtime_error { +public: + SlotTimeoutError(const std::string &msg) : std::runtime_error(msg) {} +}; + +class SlotSetterError : public std::runtime_error { +public: + SlotSetterError(const std::string &msg) : std::runtime_error(msg) {} +}; + +class SlotCancelledError : public std::runtime_error { +public: + SlotCancelledError(const std::string &msg) : std::runtime_error(msg) {} +}; SlotProxy::SlotProxy(std::shared_ptr broker, const std::string &service_id, const std::string &slot_name, bool read_only, SlotDataType data_type) : m_Broker(broker) @@ -9,8 +28,11 @@ SlotProxy::SlotProxy(std::shared_ptr broker, const std::string &s , m_SlotName(slot_name) , m_ReadOnly(read_only) , m_DataType(data_type) + , m_BaseTopic(service_id + "/" + slot_name) , m_GetTopic(service_id + "/" + slot_name + "/get") , m_SetTopic(service_id + "/" + slot_name + "/set") + , m_ErrorTopic(service_id + "/" + slot_name + "/error") + , m_CancelTopic(service_id + "/" + slot_name + "/cancel") { } @@ -75,7 +97,104 @@ ArrayView SlotProxy::GetArray() const return msg->GetPayload(); } -void SlotProxy::Set(const nlohmann::json &data) +Uuid SlotProxy::SetAsync(const nlohmann::json &data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Json) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Json slot"); + } + + auto json_str = data.dump(); + auto msg = m_Broker->PublishData(m_SetTopic, json_str.data(), json_str.size()); + return msg.GetTraceId(); +} + +Uuid SlotProxy::SetAsync(std::string_view data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Raw) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Raw slot"); + } + + auto msg = m_Broker->PublishData(m_SetTopic, data.data(), data.size()); + return msg.GetTraceId(); +} + +Uuid SlotProxy::SetAsync(ArrayView data) +{ + if (m_ReadOnly) + { + throw std::runtime_error("Slot '" + m_SlotName + "' is read-only"); + } + + if (m_DataType != SlotDataType::Array) + { + throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not an Array slot"); + } + + auto msg = m_Broker->PublishArray(m_SetTopic, data); + return msg.GetTraceId(); +} + +void SlotProxy::WaitForConfirmation(MessageSubscription &subscription, const Uuid &trace_id, double timeout_seconds) +{ + Timer timer; + + while (true) + { + auto elapsed = timer.GetTime(); + if (elapsed >= timeout_seconds) + { + throw SlotTimeoutError("Timeout waiting for slot '" + m_SlotName + "' confirmation"); + } + + auto remaining = timeout_seconds - elapsed; + if (remaining <= 0) + { + throw SlotTimeoutError("Timeout waiting for slot '" + m_SlotName + "' confirmation"); + } + + // Wait for next message with remaining timeout + auto response_msg = subscription.GetNextMessage(remaining); + + // Check if this is for our trace_id + if (response_msg->GetTraceId() == trace_id) + { + // Check topic to determine message type + auto topic = response_msg->GetTopic(); + if (topic == m_GetTopic) + { + // Success - confirmation received + return; + } + else if (topic == m_ErrorTopic) + { + // Error - parse error message + auto payload = response_msg->GetPayload(); + std::string error_msg(static_cast(payload.data), payload.info.GetSizeInBytes()); + throw SlotSetterError("Slot '" + m_SlotName + "' setter error: " + error_msg); + } + else if (topic == m_CancelTopic) + { + // Cancelled + throw SlotCancelledError("Slot '" + m_SlotName + "' operation was cancelled"); + } + } + // If trace_id doesn't match, continue waiting + } +} + +void SlotProxy::Set(const nlohmann::json &data, double timeout_seconds) { if (m_ReadOnly) { @@ -87,12 +206,19 @@ void SlotProxy::Set(const nlohmann::json &data) throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Json slot"); } + // Create subscription before publishing to avoid race condition + auto sub = m_Broker->Subscribe(m_BaseTopic, MessageSubscriptionMode::Sequential); + + // Publish and get trace_id auto json_str = data.dump(); + auto msg = m_Broker->PublishData(m_SetTopic, json_str.data(), json_str.size()); + Uuid trace_id = msg.GetTraceId(); - m_Broker->PublishData(m_SetTopic, json_str.data(), json_str.size()); + // Wait for confirmation + WaitForConfirmation(sub, trace_id, timeout_seconds); } -void SlotProxy::Set(std::string_view data) +void SlotProxy::Set(std::string_view data, double timeout_seconds) { if (m_ReadOnly) { @@ -104,10 +230,18 @@ void SlotProxy::Set(std::string_view data) throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not a Raw slot"); } - m_Broker->PublishData(m_SetTopic, data.data(), data.size()); + // Create subscription before publishing to avoid race condition + auto sub = m_Broker->Subscribe(m_BaseTopic, MessageSubscriptionMode::Sequential); + + // Publish and get trace_id + auto msg = m_Broker->PublishData(m_SetTopic, data.data(), data.size()); + Uuid trace_id = msg.GetTraceId(); + + // Wait for confirmation + WaitForConfirmation(sub, trace_id, timeout_seconds); } -void SlotProxy::Set(ArrayView data) +void SlotProxy::Set(ArrayView data, double timeout_seconds) { if (m_ReadOnly) { @@ -119,7 +253,15 @@ void SlotProxy::Set(ArrayView data) throw std::runtime_error("Type mismatch: slot '" + m_SlotName + "' is not an Array slot"); } - m_Broker->PublishArray(m_SetTopic, data); + // Create subscription before publishing to avoid race condition + auto sub = m_Broker->Subscribe(m_BaseTopic, MessageSubscriptionMode::Sequential); + + // Publish and get trace_id + auto msg = m_Broker->PublishArray(m_SetTopic, data); + Uuid trace_id = msg.GetTraceId(); + + // Wait for confirmation + WaitForConfirmation(sub, trace_id, timeout_seconds); } MessageSubscription SlotProxy::Subscribe(MessageSubscriptionMode mode) diff --git a/catkit_core/SlotProxy.h b/catkit_core/SlotProxy.h index e8c64a345..5790dfcb6 100644 --- a/catkit_core/SlotProxy.h +++ b/catkit_core/SlotProxy.h @@ -46,16 +46,32 @@ class SlotProxy ArrayView GetArray() const; /** - * Set the slot value. + * Set the slot value asynchronously. * Publishes to /set topic, triggers service setter callback. * Type is validated against slot's data_type. * * @param data The data to set + * @return The trace ID for tracking the operation * @throws std::runtime_error if slot is read-only or type mismatch */ - void Set(const nlohmann::json &data); - void Set(std::string_view data); - void Set(ArrayView data); + Uuid SetAsync(const nlohmann::json &data); + Uuid SetAsync(std::string_view data); + Uuid SetAsync(ArrayView data); + + /** + * Set the slot value synchronously with confirmation. + * Publishes to /set topic and waits for confirmation on /get or error on /error. + * + * @param data The data to set + * @param timeout_seconds Maximum time to wait for confirmation (default 5.0) + * @throws std::runtime_error if slot is read-only, type mismatch, timeout, or setter error + * @throws SlotTimeoutError if confirmation not received within timeout + * @throws SlotSetterError if service setter throws exception + * @throws SlotCancelledError if operation was cancelled + */ + void Set(const nlohmann::json &data, double timeout_seconds = 5.0); + void Set(std::string_view data, double timeout_seconds = 5.0); + void Set(ArrayView data, double timeout_seconds = 5.0); /** * Subscribe to slot updates. @@ -81,13 +97,27 @@ class SlotProxy SlotDataType GetDataType() const; private: + /** + * Wait for confirmation message with matching trace_id. + * + * @param subscription The subscription to wait on + * @param trace_id The trace ID to match + * @param timeout_seconds Maximum time to wait + * @throws SlotTimeoutError if timeout expires + * @throws SlotSetterError if error message received + * @throws SlotCancelledError if cancel message received + */ + void WaitForConfirmation(MessageSubscription &subscription, const Uuid &trace_id, double timeout_seconds); std::shared_ptr m_Broker; std::string m_ServiceId; std::string m_SlotName; bool m_ReadOnly; SlotDataType m_DataType; + std::string m_BaseTopic; std::string m_GetTopic; std::string m_SetTopic; + std::string m_ErrorTopic; + std::string m_CancelTopic; }; #endif // SLOT_PROXY_H From 3046258a15d34cb3b8e5000588206aeff8d6fec1 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sat, 28 Feb 2026 17:40:03 +0900 Subject: [PATCH 8/9] Never miss messages for property setters and command executes. --- catkit_core/Service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index f455db890..cb1741c5d 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -374,7 +374,7 @@ void Service::MonitorHeartbeats() void Service::MonitorPropertiesAndCommands() { auto broker = m_Testbed->GetMessageBroker(); - auto subscription = broker->Subscribe(m_ServiceId); + auto subscription = broker->Subscribe(m_ServiceId, MessageSubscriptionMode::Sequential); while (!ShouldShutDown()) { From b024fb96eb9f5ff35f273541b42dbac64ff037c4 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 1 Mar 2026 19:30:59 +0900 Subject: [PATCH 9/9] Add documentation for writing a service or service proxy implementation. --- docs/index.rst | 1 + docs/services.rst | 103 +----- docs/testbed_implementation.rst | 5 +- docs/writing_services.rst | 552 ++++++++++++++++++++++++++++++++ 4 files changed, 570 insertions(+), 91 deletions(-) create mode 100644 docs/writing_services.rst diff --git a/docs/index.rst b/docs/index.rst index e4c118f40..825fcbaf3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ Catkit2 configuration protocol services + writing_services benchmarks safety diff --git a/docs/services.rst b/docs/services.rst index c98029a4d..30a0f1355 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -4,9 +4,7 @@ Services Using a service --------------- -To use a service, you need to have a ``TestbedProxy`` instance that is connected to the testbed server. You can then access -the service as an attribute of the testbed proxy, using the service ID, as defined in the configuration file, as the attribute name. -For example, if you have a service with the service ID ``science_camera``, you can access it through its proxy as follows: +To use a service, you need to have a ``TestbedProxy`` instance that is connected to the testbed server. You can then access the service as an attribute of the testbed proxy, using the service ID, as defined in the configuration file, as the attribute name. For example, if you have a service with the service ID ``science_camera``, you can access it through its proxy as follows: .. code-block:: python @@ -21,15 +19,12 @@ where the port number is replaced by the correct value of your testbed server. A science_camera = testbed.get_service('science_camera') -The service proxy will automatically start the service as soon as you try to access one of its attributes (property, command or datastream) if it is not already running, and will raise an exception if the -service has crashed or is in failsafe mode. You can then use the service proxy to call methods, or access properties and -datastreams of the service as usual. +The service proxy will automatically start the service as soon as you try to access one of its attributes (property, command or datastream) if it is not already running, and will raise an exception if the service has crashed or is in failsafe mode. You can then use the service proxy to call methods, or access properties and datastreams of the service as usual. Service configuration --------------------- -Each service to be used with a testbed needs its own entry in the configuration file ``services.yml``. The config entry -of each service contains two blocks: +Each service to be used with a testbed needs its own entry in the configuration file ``services.yml``. The config entry of each service contains two blocks: - The **general** setup block - The **service-specific** parameter block @@ -55,31 +50,25 @@ An example for such a general setup block, in this case for a camera, is given h ... # -> specific parameter block -The "name" of a service, in the above example ``science_camera``, is called the ``service_id`` and needs to be unique -across all services. You can however have several services use the same ``service_type``. For example, you can have three camera serviced with -``service_ids`` ``cam1``, ``cam2`` and ``cam3`` all using ``service_type: zwo_camera``, but you cannot duplicate service IDs. +The "name" of a service, in the above example ``science_camera``, is called the ``service_id`` and needs to be unique across all services. You can however have several services use the same ``service_type``. For example, you can have three camera serviced with ``service_ids`` ``cam1``, ``cam2`` and ``cam3`` all using ``service_type: zwo_camera``, but you cannot duplicate service IDs. You can find example service configurations for each service type in the documentation of the built-in services. .. note:: The config keys ``service_type`` and ``requires_safety`` are mandatory, the others are not. You need at least the ``service_type`` when running on hardware, which will be interpreted as the simulated service type when running in simulated mode if the latter hasn’t been set explicitly. The ``interface`` (service proxy) is optional. -The key ``requires_safety`` is a simple boolean that defines whether safety is checked while the service is running, and -whether changes in the safety reports will influence the running service. +The key ``requires_safety`` is a simple boolean that defines whether safety is checked while the service is running, and whether changes in the safety reports will influence the running service. -The key ``simulated_service_type`` designates which service is used when running the testbed in simulated mode. -Simulated services are also "just" services, implemented in the same way. The crucial difference is that they do not -talk to hardware and instead talk to the testbed simulator service stored in the object ``self.testbed.simulator``. -The simulator holds an instance of the optical model, saved in ``self.model``. +The key ``simulated_service_type`` designates which service is used when running the testbed in simulated mode. Simulated services are also "just" services, implemented in the same way. The crucial difference is that they do not talk to hardware and instead talk to the testbed simulator service stored in the object ``self.testbed.simulator``. The simulator holds an instance of the optical model, saved in ``self.model``. The key ``interface`` specifies which service proxy is used for a service. This key is optional. If not set, a default service proxy class will be used. Launching and debugging a service --------------------------------- -Services can be started both by the ``Testbed`` upon requests from a ``TestbedProxy``, or manually from the command line. The -latter might be advantageous when debugging a service since not all the output of a service is logged. Services implemented in Python can be -started manually from the command line using : +.. launch-debug-service: + +Services can be started both by the ``Testbed`` upon requests from a ``TestbedProxy``, or manually from the command line. The latter might be advantageous when debugging a service since not all the output of a service is logged. Services implemented in Python can be started manually from the command line using : .. code-block:: bash @@ -99,11 +88,7 @@ C++ services can also be started manually from the command line using: service_type.exe --id --port --tesbed_port -where ```` and ```` are replaced by their correct values, ```` is arbitrary. Services will -register themselves with the testbed. After registration, you can use them from any TestbedProxy as usual. Note that -even in this case, the ``service_id`` needs to be an entry in the ``services.yml`` configuration file. Otherwise, the -testbed does not have a configuration to provide to the service. Services running in the terminal can be interrupted -using Ctrl+C, or by stopping them using a separate TestbedProxy or ServiceProxy. +where ```` and ```` are replaced by their correct values, ```` is arbitrary. Services will register themselves with the testbed. After registration, you can use them from any TestbedProxy as usual. Note that even in this case, the ``service_id`` needs to be an entry in the ``services.yml`` configuration file. Otherwise, the testbed does not have a configuration to provide to the service. Services running in the terminal can be interrupted using Ctrl+C, or by stopping them using a separate TestbedProxy or ServiceProxy. Service state ------------- @@ -142,73 +127,13 @@ For stopping and starting a service: testbed.science_camera.stop() testbed.science_camera.start() -A service is also started automatically if you try to access an attribute (property or command or datastream) on a service. -If the service has crashed, or is in failsafe mode, then the service will refuse to start by itself or via the service -proxy alone. This is to avoid an infinite loop of the service crashing and restarting, which is undesired. -Instead, you can ask the testbed server to start a `CRASHED`/`FAIL_SAFE`/`CLOSED` service: +A service is also started automatically if you try to access an attribute (property or command or datastream) on a service. If the service has crashed, or is in failsafe mode, then the service will refuse to start by itself or via the service proxy alone. This is to avoid an infinite loop of the service crashing and restarting, which is often undesired. Instead, you can ask the testbed server to start a ``CRASHED`` / ``FAIL_SAFE`` / ``CLOSED`` service: .. code-block:: python testbed.start_service('science_camera') -Creating your own service -------------------------- - -File structure and file naming for services -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Create a folder under ``services/`` named for your service. Add a Python module with the same name. The service class -inside the module should use the same name but in CamelCase, see also next section. -Also create a simulated-service folder using the same base name with the suffix ``_sim``. -Add a proxy module for the service in ``testbed/proxies/`` (named after the service), and create a documentation page under `docs/` for the service; include that page in `index.rst`. - -:: - Example: - - services/ - ├── my_service/ - │ ├── my_service.py # service implementation - ├── my_service_sim/ - │ ├── my_service_sim.py # simulated service implementation - testbed/ - ├── proxies/ - │ ├── my_service.py # service proxy implementation - -Services and service proxies can also be defined outside of catkit2 if there is a more specific or new need. - -.. note:: - - No matter where a service and/or proxy is defined, they need to be added to the entry points in the ``pyproject.toml`` file of - catkit2, so that they can be found by the testbed server and testbed proxy. - -Service implementation -~~~~~~~~~~~~~~~~~~~~~~ - -Each service is contained in its own module, and is implemented as a class. The name of the class can be arbitrary, but -it is recommended to use the same name as the service type in CamelCase. For example, if your service type is ``my_service``, -then the service class should be named ``MyService``. The same applies to the simulated service, which should be named ``MyServiceSim`` in this case. - -.. note:: - - - Services inherit from ``Service`` - - Simulated services inherit from ``Service`` - - Service proxies inherit from ``ServiceProxy`` - -When implementing a service, you need to implement at least the following methods: -- ``init()``: Called when the service process is started. Contains all the code that is needed to initialize the service. For example, this includes reading any variables from the service configuration that are needed for the service to run. -- ``open()``: Called when the service is started. Contains all the code that is needed to start the service and make it operational. For example, this includes establishing connections with hardware or other services, and starting any threads that are needed for the service to run. -- ``main()``: Called when the service is running. Contains all the code that is needed to keep the service running. -- ``close()``: Called when the service is stopped. Contains all the code that is needed to safely shut down the service. For example, this includes closing connections with hardware or other services, and stopping any threads that were started in the ``open()`` method. - -At the end of a service file, you need to add the following code to allow the service to be started: - -.. code-block:: python - - if __name__ == "__main__": - service = MyService() # replace with the name of your service class - service.run() - -Example implementation -~~~~~~~~~~~~~~~~~~~~~~ +See Also +-------- -A good examples of a previously implemented service: `Physik Stage Controller `_ +For detailed information on implementing services, see :doc:`writing_services`. diff --git a/docs/testbed_implementation.rst b/docs/testbed_implementation.rst index 6a7c09964..9d9b7603e 100644 --- a/docs/testbed_implementation.rst +++ b/docs/testbed_implementation.rst @@ -83,7 +83,7 @@ and are in the same conda environment: .. code-block:: shell - cd my_testbed + cd my_testbed pip install -e . You are now ready to fire up your very own testbed server. To do so without having to have prepared any hardware, @@ -157,7 +157,8 @@ hardware camera model instead of the simulated version. For this, you need to up Once you have made the necessary changes, you can start the server without the ``--simulated`` flag to run it in hardware mode: .. code-block:: shell - my_testbed start server + + my_testbed start server It is recommended to add and calibrate hardware services one by one, testing each service by running the server without the ``--simulated`` flag to ensure that it is functioning correctly with the real hardware. You might also want to debug diff --git a/docs/writing_services.rst b/docs/writing_services.rst new file mode 100644 index 000000000..3a683600d --- /dev/null +++ b/docs/writing_services.rst @@ -0,0 +1,552 @@ +Writing Custom Services +======================= + +This guide shows you how to write catkit2 services. + +File Structure and Naming +------------------------- + +An example file structure for your testbed is as follows: + + my_testbed/ + ├── services/ + │ ├── my_service.py # service implementation + │ ├── my_service_sim.py # simulated service implementation + ├── proxies/ + │ ├── my_service_proxy.py # service proxy implementation + ├── pyproject.toml # entry-points for my_testbed + +This file structure is not strict and you can put service and proxy implementations at any directory in your project. It is however recommended to separate the proxy implementation from the service implementation. This has two reasons. First, it helps to separate "server" from "client" code when writing them. Secondly, it allows you to not have to import hardware-related packages into your client-side code. + +.. note:: + Each service is contained in a separate file. When catkit2 starts a service, it runs the source file directly rather than a class or function within that source file. This is done to support services written in C++ as well. + +.. note:: + + No matter where a service and/or proxy is defined, they need to be added to the entry points in the ``pyproject.toml`` file, so that they can be found by the testbed server and testbed proxy. We will do this later in this guide. + +Service Class Implementation +---------------------------- + +Each service is implemented as a class. The name of the class can be arbitrary, but for consistency we recommend to use the same name as the service type in CamelCase. For example, if your service type is ``my_service``, then the service class would be named ``MyService``. The same applies to the simulated service, which would be named ``MyServiceSim`` in this case. + +Each service class inherits from :class:`Service`, no matter whether it is a hardware service or simulated service. In actuality, simulated services are just regular services that have the same interface as the hardware service but emulate its behavior. + +Any service proxy inherits from :class:`ServiceProxy`. This base class provides basic functionality to communicate with the service, so you don't have to write that yourself. + +Communication Primitives +------------------------ + +Once started, services communicate with the outside world using the following primitives: + +- **Properties** + Configuration values that clients can read and optionally write. + +- **Commands** + Actions that clients can trigger on the service. + +- **Data Streams** + High-throughput streaming data (images, sensor readings). + +- **Slots** (Experimental) + A new unified primitive that combines Properties and Data Streams with type safety and automatic confirmation. + + .. warning:: + Slots are experimental and their API is subject to change. Use at your own risk. + +These communication primitives are set up during the creation of your service and are the only way to interact with it once it is running. We will detail how to set up and use these communication primitives later on in this guide. + +Minimal Service Example +----------------------- + +Here is the simplest possible service: + +.. code-block:: python + + from catkit2 import Service + + class EmptyService(Service): + def __init__(self): + super().__init__('empty_service') + + def open(self): + pass + + def main(self): + while not self.should_shut_down: + self.sleep(1) + + def close(self): + pass + + if __name__ == '__main__': + service = EmptyService() + service.run() + +All services inherit from the ``Service`` base class. This base class provides base functionality and a ``run()`` method that you should call to run the service. This function will call the ``open()`` (optional), ``main()`` (mandatory) and ``close()`` (optional) in order. The ``run()`` method should be called at the end of your service file to allow the service to be started as in the above example. + +Services follow a well-defined lifecycle: + +1. **Initialization** - ``__init__()``. Calls ``super().__init__('empty_service')`` with the service type name. This function should be minimal, but you can check correctness of the config here. You should _not_ connect to any hardware during this function. +2. **Opening** - ``open()`` (optional). Create your properties, commands, data streams, and slots here. You should connect to your hardware here. At the end of this function, the service should be fully operational and able to accept requests from the outside world. +3. **Running** - ``main()`` (required). This is the main loop. After this loop ends, the service will shut down. Always regularly check ``self.should_shut_down`` which is a boolean variable that indicates whether the service is requested to shut down. +4. **Closing** - ``close()`` (optional). Clean up resources here. Even if ``main()`` raised an exception and ended unexpectedly, the ``close()`` function will get called. Essentially, this function acts like a ``finally`` block around your main function. + +Note that even though ``open()`` and ``close()`` are optional, they were included in the above code example for completeness. Also note that ``self.sleep()`` was called in the above example. This is a replacement of ``time.sleep()`` that regularly checks ``self.should_shut_down`` and quits sleeping when it is ``True``. This improves responsiveness for shutting down the service and avoids potential deadlocks. + +Registering Your Service +------------------------ + +For catkit2 to find your new Service type, you need to register it. To do this, add entry points to ``pyproject.toml``: + +.. code-block:: toml + + [project.entry-points."catkit2.services"] + my_service = "my_testbed.services.my_service" + + [project.entry-points."catkit2.proxies"] + my_service_proxy = "my_testbed.proxies.my_service_proxy:MyServiceProxy" + +Be sure to reinstall your package after each change for the changes to take effect. + +.. note:: + The Service entry point contains the module / file where the service is implemented, but the service proxy entry point contains the class of the proxy itself. + +Configuration +------------- + +Now that your service type is registered, you can create a new service that uses it. Add the following to your ``services.yml`` configuration file: + +.. code-block:: yaml + + new_service_id: + service_type: new_service + interface: new_service_proxy + requires_safety: false + + serial_number: 83985734578 + update_interval: 1.0 # seconds + timeout: 30 # seconds + calibration: + nm_per_step: + x: 64 + y: 56 + z: 75 + +The ``new_service_id`` is the name of your new service. You'll be able to access your service using ``testbed.new_service_id`` afterwards, so choose a descriptive name for this. + +There are several attributes afterwards: + +- ``service_type`` (required) tells catkit2 what service implementation to start. This name should correspond to one of the entry points. +- ``interface`` (optional) tells catkit2 what service proxy type to use. If this is not given, a minimal service proxy will be used instead. +- ``simulated_service_type`` (optional) tells catkit2 what service implementation to start when the testbed is in simulated mode. +- ``requires_safety`` (required) indicates whether this service requires a safe testbed to operate. See :doc:`safety` for more information about the safety system. +- All further attributes are part of the configuration of the service, available via ``self.config`` while inside the service implementation. For example: + + .. code-block:: python + + def open(self): + self.serial_number = self.config['serial_number'] + + # Optional config with default + self.timeout = self.config.get('timeout', 30) + + # Nested config + self.nm_per_step_x = self.config['calibration']['x'] + +Defining Properties +------------------- + +Properties expose service state to clients. They are ideal for configuration values and status. + +Read-Only Property +^^^^^^^^^^^^^^^^^^ + +Use for status values that clients can read but not modify: + +.. code-block:: python + + def open(self): + self.make_property('sensor_id', self.get_sensor_id) + +The first argument is the name of the property, the second is a function that gets called when your property value is requested by someone else. + +Read/Write Property +^^^^^^^^^^^^^^^^^^^ + +Use for configuration values that clients can modify: + +.. code-block:: python + + def open(self): + self.make_property('exposure_time', self.get_exposure_time, self.set_exposure_time, type='float64') + + def get_exposure_time(): + return self._exposure_time + + def set_exposure_time(value): + self.camera.set_exposure(value) + + self._exposure_time = value + self.log.info(f'Exposure time set to {value}') + +**Important**: The setter should validate the value and raise an exception if invalid. The exception will be propagated to the caller. + +Note that we added a type here. Typed properties are faster but require a strict type. Available types: ``'int64'``, ``'float64'``. + +Defining Commands +----------------- + +Commands trigger actions on the service. They are ideal for operations that don't fit the property model. Commands should be quick: do not have a command ``calibrate()`` but use ``start_calibration()``. The reason for this is that the client will block until the command is completed and will accept no other command executions or property get/set requests while the current command is running. + +Simple Commands +^^^^^^^^^^^^^^^ + +.. code-block:: python + + def open(self): + self.make_command('start_acquisition', self.start_acquisition) + self.make_command('reset', self.reset) + + def start_acquisition(self): + self.log.info('Starting acquisition...') + # Logic to start acquisition here. The actual acquisition + # is done by the main() loop. + + def reset(self): + self.log.info('Resetting device...') + self._counter = 0 + +Commands with Arguments +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + def open(self): + self.make_command('set_position', self.set_position) + + def set_position(self, x, y): + self.log.info(f'Moving to position ({x}, {y})') + self.stage.move_to(x, y) + +Commands with Return Values +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + def open(self): + self.make_command('get_status', self.get_status) + + def get_status(self): + return { + 'temperature': self.read_temp(), + 'voltage': self.read_voltage(), + 'state': self.state + } + +Defining Data Streams +--------------------- + +Data streams enable high-performance data transfer for continuous data like images or sensor readings. Data streams operate in shared memory, allowing near-instant publishing and retrieval of data frames. However, they require strict typing and shape, set on creation. After creation, you can change the type and shape of the arrays, at a runtime cost. The preferred use-case is data that changes type/shape infrequently, such as camera images or DM commands. + +Basic Data Stream +^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + def open(self): + # Create a 2D image stream (20 frame buffer) + self.images = self.make_data_stream( + 'images', # Stream name + 'float32', # Data type + [1024, 1024], # Shape + 20 # Buffer size (number of frames) + ) + + def main(self): + while not self.should_shut_down: + # Generate or acquire data + image = self.acquire_image() + + # Submit to stream + self.images.submit_data(image) + +Data Types for Streams +^^^^^^^^^^^^^^^^^^^^^^ + +Common data types: + +* ``'float32'``, ``'float64'`` - Floating point data +* ``'int8'``, ``'int16'``, ``'int32'``, ``'int64'`` - Signed integers +* ``'uint8'``, ``'uint16'``, ``'uint32'``, ``'uint64'`` - Unsigned integers + +Reading from Data Streams (within a service) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Services can also read from their own data streams or from other services: + +.. code-block:: python + + def main(self): + # Subscribe to command stream from another service + command_stream = self.testbed.other_service.command + + while not self.should_shut_down: + try: + # Wait for next frame (5 second timeout) + frame = command_stream.get_next_frame(5) + command = frame.data[0] + + # Process command + self.process_command(command) + + except Exception: + # Timeout of get_next_frame. We should check if we should shut down. + continue + +Defining Slots +-------------- + +.. warning:: + The Slots API is experimental and subject to change. Use at your own risk. + +Slots are a new unified communication primitive that combines Properties and Data Streams into a single, type-safe API with confirmation. + +Key Features: + +* **Simpler types**: Slots are typed at creation (Json, Raw Bytes, Array). +* **Explicit about confirmation**: ``set()`` blocks until service confirms success while ``set_async()`` requests a change without confirmation. +* **Error Handling**: Clear exceptions for timeout, validation errors, cancellation +* **Unified API**: Single interface for all communication patterns rather than the unclear split between properties and data streams. + +Creating Read-Only Slots +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from catkit2.catkit_bindings import SlotDataType + + def open(self): + # Create read-only slots + self.temperature_slot = self.make_json_slot('temperature') + self.status_slot = self.make_raw_slot('status_bytes') + self.image_slot = self.make_array_slot('camera_image') + +Publishing Data to Slots +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + def main(self): + while not self.should_shut_down: + # Read sensor + temp = self.read_temperature_sensor() + + # Publish to slot (type handled automatically) + self.temperature_slot.publish(temp) + + # Capture image + frame = self.capture_camera_frame() + self.image_slot.publish(frame) + + self.sleep(0.1) + +Creating Read-Write Slots +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + def open(self): + # Create read-write slot with setter callback + self.target_slot = self.make_json_slot( + 'target_temperature', + self.on_set_target_temperature + ) + + def on_set_target_temperature(self, value, context): + """Called when a client sets target_temperature.""" + # Check if cancelled. + if context.is_cancelled(): + raise RuntimeError("Operation cancelled") + + # Check if `value` has the correct type. + if not isinstance(value, float): + raise RuntimeError("Value has the wrong type.") + + # Process the value + self.target_temp = value + + # MUST confirm by publishing back + context.publish_json(value) + +In this example, the call to ``context.is_cancelled()`` is a bit superfluous, since the check is done right at the start of the call. However, this is more useful for set operations that take some time to do, such as motor movements. During those long tasks, you should regularly check ``context.is_cancelled()`` and cancel the operation if it is ``True``. + +The setter callback is given a ``SlotContext`` object. This object provides the context of the request that was made. Its ``publish_*()`` methods will publish the current value on the correct topic with the correct ``trace_id``. You MUST call one of the ``publish_*()`` methods to confirm the set operation. If you don't, the client's ``set()`` call will timeout. + +Accessing Services from Clients +------------------------------- + +Getting a Service +^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from catkit2 import TestbedProxy + + testbed = TestbedProxy('127.0.0.1', 1234) + service = testbed.temperature_controller + +Using Properties (Client) +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # Read property + sensor_id = service.sensor_id + + # Write property + service.exposure_time = 0.01 + + # Handle errors + try: + service.exposure_time = 0.01 + except RuntimeError as e: + print(f"Failed to set: {e}") + +Using Commands (Client) +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # Simple command + service.calibrate() + + # Command with arguments + service.set_position(x=100, y=200) + + # Command with return value + status = service.get_status() + print(f"Temperature: {status['temperature']}") + +Using Data Streams (Client) +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # Subscribe to stream + subscription = service.images.subscribe() + + while running: + try: + frame = subscription.get_next_frame(timeout=1.0) + image = frame.data + process_image(image) + except Exception: + # Timeout or shutdown + continue + +Using Slots (Client - Experimental) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. warning:: + The Slots client API is experimental and subject to change. + +Reading Data +^^^^^^^^^^^^ + +.. code-block:: python + + current = service.temperature.get() + if current is not None: + print(f"Temperature: {current}°C") + +Setting Data +^^^^^^^^^^^^ + +**Synchronous (default)**: Blocks until service confirms + +.. code-block:: python + + try: + service.target_temperature.set(25.0, timeout=5.0) + except SlotTimeoutError: + print("Service didn't respond in time") + except SlotSetterError as e: + print(f"Service rejected: {e}") + +**Asynchronous**: Returns immediately + +.. code-block:: python + + trace_id = service.target_temperature.set_async(25.0) + +Subscribing to Slot Updates +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + subscription = service.temperature.subscribe() + + while running: + msg = subscription.get_next_message(timeout=1.0) + if msg: + data = msg.get_payload() + print(f"New temperature: {data['temp']}") + +Error Handling +-------------- + +Property Validation +^^^^^^^^^^^^^^^^^^^ + +Always validate property values and raise clear exceptions: + +.. code-block:: python + + def set_exposure(self, value): + if value <= 0: + raise ValueError("Exposure must be positive") + if value > 1000: + raise ValueError("Exposure cannot exceed 1000ms") + + self.camera.set_exposure(value) + +Slot Exceptions (Experimental) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from catkit2.catkit_bindings import ( + SlotTimeoutError, + SlotSetterError, + SlotCancelledError + ) + + try: + service.config.set(value, timeout=2.0) + except SlotTimeoutError: + print("Timeout - service may be busy") + except SlotSetterError as e: + print(f"Setter error: {e}") + except SlotCancelledError: + print("Operation was cancelled") + +Creating a Service Proxy +------------------------ + +Create a proxy class for convenient client access: + +.. code-block:: python + + from catkit2.testbed.service_proxy import ServiceProxy + + class TemperatureControllerProxy(ServiceProxy): + @property + def temperature(self): + """Get latest temperature reading.""" + frame = self.temperature.get_next_frame(0) + return frame.data[0] + + def set_target(self, temp): + """Set target temperature.""" + self.target_temperature = temp