diff --git a/catkit2/bindings.cpp b/catkit2/bindings.cpp index 46c014043..572526b31 100644 --- a/catkit2/bindings.cpp +++ b/catkit2/bindings.cpp @@ -644,7 +644,6 @@ PYBIND11_MODULE(catkit_bindings, m) .def_property_readonly("state", &ServiceProxy::GetState) .def_property_readonly("is_alive", &ServiceProxy::IsAlive) .def_property_readonly("is_running", &ServiceProxy::IsRunning) - .def_property_readonly("heartbeat", &ServiceProxy::GetHeartbeat) .def("start", [](ServiceProxy &service, double timeout_in_sec) { service.Start(timeout_in_sec, error_check_python); diff --git a/catkit2/testbed/testbed.py b/catkit2/testbed/testbed.py index a8261726b..341e1a135 100644 --- a/catkit2/testbed/testbed.py +++ b/catkit2/testbed/testbed.py @@ -129,21 +129,25 @@ def __init__(self, service_id, service_type, state, dependencies, broker): self.host = '127.0.0.1' self.port = 0 - self.heartbeat = None self.log = logging.getLogger(__name__) @property def state(self): - return ServiceState(int(self.state_stream.get()[0])) + state = self.broker.get_current_message(f'{self.service_id}/service_state/get').payload[0] + + return ServiceState(int(state)) @state.setter def state(self, state): new_state = np.array([state.value], dtype='int8') - self.state_stream.submit_data(new_state) self.broker.publish_array(f'{self.service_id}/service_state/get', new_state) + @property + def heartbeat(self): + return self.broker.get_current_message(f'{self.service_id}/heartbeat/get').payload[0] + @property def is_alive(self): return is_alive_state(self.state) @@ -491,7 +495,7 @@ def monitor_services(self): service.state = ServiceState.CRASHED if service.state == ServiceState.RUNNING: - heartbeat_time = service.heartbeat.get()[0] + heartbeat_time = service.heartbeat time_stamp = get_timestamp() if time_stamp - heartbeat_time > SERVICE_LIVELINESS * 1e9: @@ -501,7 +505,7 @@ def monitor_services(self): service.state = ServiceState.UNRESPONSIVE if service.state == ServiceState.UNRESPONSIVE: - heartbeat_time = service.heartbeat.get()[0] + heartbeat_time = service.heartbeat time_stamp = get_timestamp() if time_stamp - heartbeat_time < SERVICE_LIVELINESS * 1e9: @@ -660,12 +664,9 @@ def on_register_service(self, data): service.host = request.host service.port = request.port service.process_id = request.process_id - service.heartbeat = DataStream.open(request.heartbeat_stream_id) reply = testbed_proto.RegisterServiceReply() - reply.state_stream_id = service.state_stream.stream_id - return reply.SerializeToString() def on_shut_down(self, data): @@ -715,9 +716,12 @@ def start_service(self, service_id): self.log.debug(f'Service "{service_id}" was already started.') return - service_type = self.services[service_id].service_type + # Start the dependencies of this service. + for dependency in self.services[service_id].dependencies: + self.start_service(dependency) - # Resolve service type; + # Resolve service type. + service_type = self.services[service_id].service_type path = self.resolve_service_type(service_type) dirname = os.path.dirname(path) @@ -807,10 +811,6 @@ def start_service(self, service_id): self.log.info(f'Started service "{service_id}" with type "{service_type}".') - # Start the dependencies. This is not required but will speed things up. - for dependency in self.services[service_id].dependencies: - self.start_service(dependency) - def stop_service(self, service_id): self.log.debug(f'Trying to stop service "{service_id}".') diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index 6b90aace8..6b81d72f4 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -27,15 +27,15 @@ const double SAFETY_INTERVAL = 60; // seconds. Service::Service(string service_type, string service_id, int service_port, int testbed_port) : m_Server(service_port), m_ServiceId(service_id), m_ServiceType(service_type), m_LoggerConsole(), m_LoggerPublish(), - m_Heartbeat(nullptr), m_State(nullptr), m_Safety(nullptr), m_Testbed(nullptr), + m_Testbed(nullptr), m_Broker(nullptr), m_IsRunning(false), m_ShouldShutDown(false), m_FailSafe(false) { m_Testbed = make_shared("127.0.0.1", testbed_port); m_Config = m_Testbed->GetConfig()["services"][service_id]; - m_LoggerPublish.Connect(service_id, "tcp://127.0.0.1:"s + to_string(m_Testbed->GetLoggingIngressPort())); + m_Broker = m_Testbed->GetMessageBroker(); - m_Heartbeat = DataStream::Create("heartbeat", service_id, DataType::DT_UINT64, {1}, 20); + m_LoggerPublish.Connect(service_id, "tcp://127.0.0.1:"s + to_string(m_Testbed->GetLoggingIngressPort())); tracing_proxy.Connect(service_id, "127.0.0.1", m_Testbed->GetTracingIngressPort()); @@ -44,11 +44,9 @@ Service::Service(string service_type, string service_id, int service_port, int t service_type, "127.0.0.1", service_port, - GetProcessId(), - m_Heartbeat->GetStreamId() + GetProcessId() ); - m_State = DataStream::Open(state_stream_id); UpdateState(ServiceState::INITIALIZING); LOG_DEBUG("Registering request handlers."); @@ -120,7 +118,7 @@ void Service::Run(void (*error_check)()) // Publish info. std::string service_info = GetInfo(); - m_Testbed->GetMessageBroker()->PublishData(m_ServiceId + "/info/get"s, service_info.data(), service_info.size()); + m_Broker->PublishData(m_ServiceId + "/info/get"s, service_info.data(), service_info.size()); LOG_INFO("Published service info."); @@ -134,7 +132,7 @@ void Service::Run(void (*error_check)()) { auto value = GetProperty(property_name); std::string topic = m_ServiceId + "/"s + property_name + "/get"s; - m_Testbed->GetMessageBroker()->PublishData(topic, value.data(), value.size()); + m_Broker->PublishData(topic, value.data(), value.size()); } catch (std::exception &e) { @@ -152,10 +150,9 @@ void Service::Run(void (*error_check)()) // Put out an initial heartbeat. // This ensures that there is always a heartbeat on this channel. std::uint64_t timestamp = GetTimeStamp(); - m_Heartbeat->SubmitData(×tamp); ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}}; - m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); + m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); // Start the safety and heartbeat threads. std::thread safety(&Service::MonitorSafety, this); @@ -239,11 +236,9 @@ void Service::Run(void (*error_check)()) // Set heartbeat timestamp to zero to signal a dead service. std::uint64_t timestamp = 0; - m_Heartbeat->SubmitData(×tamp); ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}}; - m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); - + m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); CleanupAttributes(); } @@ -327,13 +322,6 @@ void Service::MonitorHeartbeats() { while (!ShouldShutDown()) { - // Update my own heartbeat. - std::uint64_t timestamp = GetTimeStamp(); - m_Heartbeat->SubmitData(×tamp); - - ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}}; - m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); - // Check the testbed heartbeat. if (!m_Testbed->IsAlive()) { @@ -347,11 +335,11 @@ void Service::MonitorHeartbeats() double cpu_usage = m_ProcessStats.GetCpuUsage(); ArrayInfo cpu_usage_info = {'f', '=', 8, 1, {1}, {1}}; - m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/cpu_usage/get", {cpu_usage_info, &cpu_usage}); + m_Broker->PublishArray(m_ServiceId + "/cpu_usage/get", {cpu_usage_info, &cpu_usage}); uint64_t memory_usage = m_ProcessStats.GetMemoryUsage(); ArrayInfo memory_usage_info = {'u', '=', 8, 1, {1}, {1}}; - m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/memory_usage/get", {memory_usage_info, &memory_usage}); + m_Broker->PublishArray(m_ServiceId + "/memory_usage/get", {memory_usage_info, &memory_usage}); // Sleep until next check. Sleep(SERVICE_LIVELINESS / 5); @@ -360,13 +348,25 @@ void Service::MonitorHeartbeats() void Service::MonitorPropertiesAndCommands() { - auto broker = m_Testbed->GetMessageBroker(); - auto subscription = broker->Subscribe(m_ServiceId); + auto subscription = m_Broker->Subscribe(m_ServiceId); + std::uint64_t last_heartbeat = 0; while (!ShouldShutDown()) { try { + std::uint64_t timestamp = GetTimeStamp(); + + // Update my own heartbeat, if enough time has expired since the last one. + if ((timestamp - last_heartbeat) >= (SERVICE_LIVELINESS * 1e9 / 5)) + { + ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}}; + m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, ×tamp}); + + last_heartbeat = timestamp; + } + + // Get the next message for a potential property set or command execute. auto message_optional = subscription.GetNextMessage(SERVICE_LIVELINESS / 5, EventWaitMethod::Default); if (!message_optional.has_value()) @@ -378,14 +378,14 @@ void Service::MonitorPropertiesAndCommands() // Trigger on set messages. if (topic.size() >= 4 && topic.substr(topic.size() - 4) == "/set") { - HandleSetPropertyMessage(broker, message); + HandleSetPropertyMessage(m_Broker, message); continue; } // Trigger on execute messages. if (topic.size() >= 8 && topic.substr(topic.size() - 8) == "/execute") { - HandleExecuteCommandMessage(broker, message); + HandleExecuteCommandMessage(m_Broker, message); continue; } } @@ -651,8 +651,7 @@ string Service::GetInfo() {"config", m_Config}, {"property_names", json::array()}, {"command_names", json::array()}, - {"datastream_ids", json::object()}, - {"heartbeat_stream_id", m_Heartbeat->GetStreamId()} + {"datastream_ids", json::object()} }; for (auto& [key, value] : m_Properties) @@ -682,7 +681,8 @@ string Service::HandleShutDown(const string &data) void Service::UpdateState(ServiceState state) { int8_t new_state = state; - m_State->SubmitData(&new_state); + + m_Broker->PublishData(m_ServiceId + "/service_state/get", &new_state, sizeof(new_state)); } void print_usage() diff --git a/catkit_core/Service.h b/catkit_core/Service.h index 0aab3d13d..ef05d8749 100644 --- a/catkit_core/Service.h +++ b/catkit_core/Service.h @@ -91,16 +91,13 @@ class Service std::atomic_bool m_FailSafe; std::shared_ptr m_Testbed; + std::shared_ptr m_Broker; std::string m_ServiceId; std::string m_ServiceType; nlohmann::json m_Config; - std::shared_ptr m_Heartbeat; - std::shared_ptr m_Safety; - std::shared_ptr m_State; - std::map> m_Properties; std::map> m_Commands; diff --git a/catkit_core/ServiceProxy.cpp b/catkit_core/ServiceProxy.cpp index e73c709b8..3fc49ef15 100644 --- a/catkit_core/ServiceProxy.cpp +++ b/catkit_core/ServiceProxy.cpp @@ -18,7 +18,7 @@ const double TIMEOUT_SET_PROPERTY = 120; // seconds const double TIMEOUT_EXECUTE_COMMAND = 120; // seconds ServiceProxy::ServiceProxy(std::shared_ptr testbed, std::string service_id) - : m_Testbed(testbed), m_ServiceId(service_id), m_Client(nullptr), m_State(nullptr), + : m_Testbed(testbed), m_ServiceId(service_id), m_Client(nullptr), m_TimeLastConnect(0) { // Do a check to see if the service id is correct. @@ -31,8 +31,6 @@ ServiceProxy::ServiceProxy(std::shared_ptr testbed, std::string se auto service_info = testbed->GetServiceInfo(m_ServiceId); - m_State = DataStream::Open(service_info.state_stream_id); - Connect(); } @@ -218,16 +216,16 @@ std::shared_ptr ServiceProxy::GetDataStream(const std::string &name, return m_DataStreams[name]; } -std::shared_ptr ServiceProxy::GetHeartbeat() -{ - return m_Heartbeat; -} - ServiceState ServiceProxy::GetState() { - ServiceState state = ServiceState(m_State->GetLatestFrame().AsArray()(0)); + auto message = m_Testbed->GetMessageBroker()->GetCurrentMessage(m_ServiceId + "/service_state/get"); + + if (!message.has_value()) + throw std::runtime_error("The service doesn't have a state message."); - return state; + std::int8_t *state = (std::int8_t *) message.value().GetPayload().data; + + return ServiceState(*state); } bool ServiceProxy::IsRunning() @@ -338,9 +336,12 @@ void ServiceProxy::Terminate() void ServiceProxy::Connect() { // Check if the service is running. - // Do an explicit check on the state stream to avoid infinite loop. - auto frame = m_State->GetLatestFrame(); - ServiceState state = ServiceState(frame.AsArray()(0)); + // Do an explicit check because we need the timestamp as well. + auto message = m_Testbed->GetMessageBroker()->GetCurrentMessage(m_ServiceId + "/service_state/get"); + if (!message.has_value()) + throw std::runtime_error("Service " + m_ServiceId + " didn't have a service state."); + + ServiceState state = ServiceState(*((std::int8_t *)message.value().GetPayload().data)); if (state != ServiceState::RUNNING) { @@ -350,7 +351,7 @@ void ServiceProxy::Connect() } // Check if we are already connected to the Service. - if (m_TimeLastConnect == frame.m_TimeStamp) + if (m_TimeLastConnect == message.value().GetProducerTimestamp()) return; // We need to reconnect, so let's disconnect first. @@ -379,9 +380,7 @@ void ServiceProxy::Connect() for (auto& [key, value] : info["datastream_ids"].items()) m_DataStreamIds[key] = value; - m_Heartbeat = DataStream::Open(info["heartbeat_stream_id"].get()); - - m_TimeLastConnect = frame.m_TimeStamp; + m_TimeLastConnect = message.value().GetProducerTimestamp(); LOG_DEBUG("Connected to \"" + m_ServiceId + "\"."); } @@ -392,8 +391,6 @@ void ServiceProxy::Disconnect() m_CommandNames.clear(); m_DataStreamIds.clear(); m_DataStreams.clear(); - - m_Heartbeat = nullptr; } std::vector ServiceProxy::GetPropertyNames(void (*error_check)()) diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index a7c7154f7..b801e4453 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -28,8 +28,6 @@ class ServiceProxy std::shared_ptr GetDataStream(const std::string &name, void (*error_check)() = nullptr); - std::shared_ptr GetHeartbeat(); - ServiceState GetState(); bool IsRunning(); bool IsAlive(); @@ -62,8 +60,6 @@ class ServiceProxy std::map> m_DataStreams; - std::shared_ptr m_Heartbeat; - std::shared_ptr m_State; std::uint64_t m_TimeLastConnect; }; diff --git a/catkit_core/TestbedProxy.cpp b/catkit_core/TestbedProxy.cpp index 68d2a7958..685f75728 100644 --- a/catkit_core/TestbedProxy.cpp +++ b/catkit_core/TestbedProxy.cpp @@ -140,7 +140,7 @@ ServiceReference TestbedProxy::GetServiceInfo(const std::string &service_id) return res; } -std::string TestbedProxy::RegisterService(std::string service_id, std::string service_type, std::string host, int port, int process_id, std::string heartbeat_stream_id) +std::string TestbedProxy::RegisterService(std::string service_id, std::string service_type, std::string host, int port, int process_id) { catkit_proto::testbed::RegisterServiceRequest request; @@ -149,7 +149,6 @@ std::string TestbedProxy::RegisterService(std::string service_id, std::string se request.set_host(host); request.set_port(port); request.set_process_id(process_id); - request.set_heartbeat_stream_id(heartbeat_stream_id); catkit_proto::testbed::RegisterServiceReply reply; diff --git a/catkit_core/TestbedProxy.h b/catkit_core/TestbedProxy.h index cad9fd729..8efebef94 100644 --- a/catkit_core/TestbedProxy.h +++ b/catkit_core/TestbedProxy.h @@ -43,7 +43,7 @@ class TestbedProxy : public Client, public std::enable_shared_from_this