Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion catkit2/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ PYBIND11_MODULE(catkit_bindings, m)
.def_property_readonly("state", &ServiceProxy::GetState)
.def_property_readonly("is_alive", &ServiceProxy::IsAlive)
.def_property_readonly("is_running", &ServiceProxy::IsRunning)
.def_property_readonly("heartbeat", &ServiceProxy::GetHeartbeat)
.def("start", [](ServiceProxy &service, double timeout_in_sec)
{
service.Start(timeout_in_sec, error_check_python);
Expand Down
28 changes: 14 additions & 14 deletions catkit2/testbed/testbed.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,25 @@ def __init__(self, service_id, service_type, state, dependencies, broker):

self.host = '127.0.0.1'
self.port = 0
self.heartbeat = None

self.log = logging.getLogger(__name__)

@property
def state(self):
return ServiceState(int(self.state_stream.get()[0]))
state = self.broker.get_current_message(f'{self.service_id}/service_state/get').payload[0]

return ServiceState(int(state))

@state.setter
def state(self, state):
new_state = np.array([state.value], dtype='int8')
self.state_stream.submit_data(new_state)

self.broker.publish_array(f'{self.service_id}/service_state/get', new_state)

@property
def heartbeat(self):
return self.broker.get_current_message(f'{self.service_id}/heartbeat/get').payload[0]

@property
def is_alive(self):
return is_alive_state(self.state)
Expand Down Expand Up @@ -491,7 +495,7 @@ def monitor_services(self):
service.state = ServiceState.CRASHED

if service.state == ServiceState.RUNNING:
heartbeat_time = service.heartbeat.get()[0]
heartbeat_time = service.heartbeat
time_stamp = get_timestamp()

if time_stamp - heartbeat_time > SERVICE_LIVELINESS * 1e9:
Expand All @@ -501,7 +505,7 @@ def monitor_services(self):
service.state = ServiceState.UNRESPONSIVE

if service.state == ServiceState.UNRESPONSIVE:
heartbeat_time = service.heartbeat.get()[0]
heartbeat_time = service.heartbeat
time_stamp = get_timestamp()

if time_stamp - heartbeat_time < SERVICE_LIVELINESS * 1e9:
Expand Down Expand Up @@ -660,12 +664,9 @@ def on_register_service(self, data):
service.host = request.host
service.port = request.port
service.process_id = request.process_id
service.heartbeat = DataStream.open(request.heartbeat_stream_id)

reply = testbed_proto.RegisterServiceReply()

reply.state_stream_id = service.state_stream.stream_id

return reply.SerializeToString()

def on_shut_down(self, data):
Expand Down Expand Up @@ -715,9 +716,12 @@ def start_service(self, service_id):
self.log.debug(f'Service "{service_id}" was already started.')
return

service_type = self.services[service_id].service_type
# Start the dependencies of this service.
for dependency in self.services[service_id].dependencies:
self.start_service(dependency)

# Resolve service type;
# Resolve service type.
service_type = self.services[service_id].service_type
path = self.resolve_service_type(service_type)
dirname = os.path.dirname(path)

Expand Down Expand Up @@ -807,10 +811,6 @@ def start_service(self, service_id):

self.log.info(f'Started service "{service_id}" with type "{service_type}".')

# Start the dependencies. This is not required but will speed things up.
for dependency in self.services[service_id].dependencies:
self.start_service(dependency)

def stop_service(self, service_id):
self.log.debug(f'Trying to stop service "{service_id}".')

Expand Down
58 changes: 29 additions & 29 deletions catkit_core/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ const double SAFETY_INTERVAL = 60; // seconds.
Service::Service(string service_type, string service_id, int service_port, int testbed_port)
: m_Server(service_port), m_ServiceId(service_id), m_ServiceType(service_type),
m_LoggerConsole(), m_LoggerPublish(),
m_Heartbeat(nullptr), m_State(nullptr), m_Safety(nullptr), m_Testbed(nullptr),
m_Testbed(nullptr), m_Broker(nullptr),
m_IsRunning(false), m_ShouldShutDown(false), m_FailSafe(false)
{
m_Testbed = make_shared<TestbedProxy>("127.0.0.1", testbed_port);
m_Config = m_Testbed->GetConfig()["services"][service_id];

m_LoggerPublish.Connect(service_id, "tcp://127.0.0.1:"s + to_string(m_Testbed->GetLoggingIngressPort()));
m_Broker = m_Testbed->GetMessageBroker();

m_Heartbeat = DataStream::Create("heartbeat", service_id, DataType::DT_UINT64, {1}, 20);
m_LoggerPublish.Connect(service_id, "tcp://127.0.0.1:"s + to_string(m_Testbed->GetLoggingIngressPort()));

tracing_proxy.Connect(service_id, "127.0.0.1", m_Testbed->GetTracingIngressPort());

Expand All @@ -44,11 +44,9 @@ Service::Service(string service_type, string service_id, int service_port, int t
service_type,
"127.0.0.1",
service_port,
GetProcessId(),
m_Heartbeat->GetStreamId()
GetProcessId()
);

m_State = DataStream::Open(state_stream_id);
UpdateState(ServiceState::INITIALIZING);

LOG_DEBUG("Registering request handlers.");
Expand Down Expand Up @@ -120,7 +118,7 @@ void Service::Run(void (*error_check)())

// Publish info.
std::string service_info = GetInfo();
m_Testbed->GetMessageBroker()->PublishData(m_ServiceId + "/info/get"s, service_info.data(), service_info.size());
m_Broker->PublishData(m_ServiceId + "/info/get"s, service_info.data(), service_info.size());

LOG_INFO("Published service info.");

Expand All @@ -134,7 +132,7 @@ void Service::Run(void (*error_check)())
{
auto value = GetProperty(property_name);
std::string topic = m_ServiceId + "/"s + property_name + "/get"s;
m_Testbed->GetMessageBroker()->PublishData(topic, value.data(), value.size());
m_Broker->PublishData(topic, value.data(), value.size());
}
catch (std::exception &e)
{
Expand All @@ -152,10 +150,9 @@ void Service::Run(void (*error_check)())
// Put out an initial heartbeat.
// This ensures that there is always a heartbeat on this channel.
std::uint64_t timestamp = GetTimeStamp();
m_Heartbeat->SubmitData(&timestamp);

ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}};
m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});
m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});

// Start the safety and heartbeat threads.
std::thread safety(&Service::MonitorSafety, this);
Expand Down Expand Up @@ -239,11 +236,9 @@ void Service::Run(void (*error_check)())

// Set heartbeat timestamp to zero to signal a dead service.
std::uint64_t timestamp = 0;
m_Heartbeat->SubmitData(&timestamp);

ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}};
m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});

m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});

CleanupAttributes();
}
Expand Down Expand Up @@ -327,13 +322,6 @@ void Service::MonitorHeartbeats()
{
while (!ShouldShutDown())
{
// Update my own heartbeat.
std::uint64_t timestamp = GetTimeStamp();
m_Heartbeat->SubmitData(&timestamp);

ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}};
m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});

// Check the testbed heartbeat.
if (!m_Testbed->IsAlive())
{
Expand All @@ -347,11 +335,11 @@ void Service::MonitorHeartbeats()

double cpu_usage = m_ProcessStats.GetCpuUsage();
ArrayInfo cpu_usage_info = {'f', '=', 8, 1, {1}, {1}};
m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/cpu_usage/get", {cpu_usage_info, &cpu_usage});
m_Broker->PublishArray(m_ServiceId + "/cpu_usage/get", {cpu_usage_info, &cpu_usage});

uint64_t memory_usage = m_ProcessStats.GetMemoryUsage();
ArrayInfo memory_usage_info = {'u', '=', 8, 1, {1}, {1}};
m_Testbed->GetMessageBroker()->PublishArray(m_ServiceId + "/memory_usage/get", {memory_usage_info, &memory_usage});
m_Broker->PublishArray(m_ServiceId + "/memory_usage/get", {memory_usage_info, &memory_usage});

// Sleep until next check.
Sleep(SERVICE_LIVELINESS / 5);
Expand All @@ -360,13 +348,25 @@ void Service::MonitorHeartbeats()

void Service::MonitorPropertiesAndCommands()
{
auto broker = m_Testbed->GetMessageBroker();
auto subscription = broker->Subscribe(m_ServiceId);
auto subscription = m_Broker->Subscribe(m_ServiceId);
std::uint64_t last_heartbeat = 0;

while (!ShouldShutDown())
{
try
{
std::uint64_t timestamp = GetTimeStamp();

// Update my own heartbeat, if enough time has expired since the last one.
if ((timestamp - last_heartbeat) >= (SERVICE_LIVELINESS * 1e9 / 5))
{
ArrayInfo info{'u', '=', 8, 1, {1, 1, 1, 1}, {8, 1, 1, 1}};
m_Broker->PublishArray(m_ServiceId + "/heartbeat/get", {info, &timestamp});

last_heartbeat = timestamp;
}

// Get the next message for a potential property set or command execute.
auto message_optional = subscription.GetNextMessage(SERVICE_LIVELINESS / 5, EventWaitMethod::Default);

if (!message_optional.has_value())
Expand All @@ -378,14 +378,14 @@ void Service::MonitorPropertiesAndCommands()
// Trigger on set messages.
if (topic.size() >= 4 && topic.substr(topic.size() - 4) == "/set")
{
HandleSetPropertyMessage(broker, message);
HandleSetPropertyMessage(m_Broker, message);
continue;
}

// Trigger on execute messages.
if (topic.size() >= 8 && topic.substr(topic.size() - 8) == "/execute")
{
HandleExecuteCommandMessage(broker, message);
HandleExecuteCommandMessage(m_Broker, message);
continue;
}
}
Expand Down Expand Up @@ -651,8 +651,7 @@ string Service::GetInfo()
{"config", m_Config},
{"property_names", json::array()},
{"command_names", json::array()},
{"datastream_ids", json::object()},
{"heartbeat_stream_id", m_Heartbeat->GetStreamId()}
{"datastream_ids", json::object()}
};

for (auto& [key, value] : m_Properties)
Expand Down Expand Up @@ -682,7 +681,8 @@ string Service::HandleShutDown(const string &data)
void Service::UpdateState(ServiceState state)
{
int8_t new_state = state;
m_State->SubmitData(&new_state);

m_Broker->PublishData(m_ServiceId + "/service_state/get", &new_state, sizeof(new_state));
}

void print_usage()
Expand Down
5 changes: 1 addition & 4 deletions catkit_core/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,13 @@ class Service
std::atomic_bool m_FailSafe;

std::shared_ptr<TestbedProxy> m_Testbed;
std::shared_ptr<MessageBroker> m_Broker;

std::string m_ServiceId;
std::string m_ServiceType;

nlohmann::json m_Config;

std::shared_ptr<DataStream> m_Heartbeat;
std::shared_ptr<DataStream> m_Safety;
std::shared_ptr<DataStream> m_State;

std::map<std::string, std::pair<PropertyGetter, PropertySetter>> m_Properties;

std::map<std::string, std::shared_ptr<Command>> m_Commands;
Expand Down
35 changes: 16 additions & 19 deletions catkit_core/ServiceProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const double TIMEOUT_SET_PROPERTY = 120; // seconds
const double TIMEOUT_EXECUTE_COMMAND = 120; // seconds

ServiceProxy::ServiceProxy(std::shared_ptr<TestbedProxy> testbed, std::string service_id)
: m_Testbed(testbed), m_ServiceId(service_id), m_Client(nullptr), m_State(nullptr),
: m_Testbed(testbed), m_ServiceId(service_id), m_Client(nullptr),
m_TimeLastConnect(0)
{
// Do a check to see if the service id is correct.
Expand All @@ -31,8 +31,6 @@ ServiceProxy::ServiceProxy(std::shared_ptr<TestbedProxy> testbed, std::string se

auto service_info = testbed->GetServiceInfo(m_ServiceId);

m_State = DataStream::Open(service_info.state_stream_id);

Connect();
}

Expand Down Expand Up @@ -218,16 +216,16 @@ std::shared_ptr<DataStream> ServiceProxy::GetDataStream(const std::string &name,
return m_DataStreams[name];
}

std::shared_ptr<DataStream> ServiceProxy::GetHeartbeat()
{
return m_Heartbeat;
}

ServiceState ServiceProxy::GetState()
{
ServiceState state = ServiceState(m_State->GetLatestFrame().AsArray<std::int8_t>()(0));
auto message = m_Testbed->GetMessageBroker()->GetCurrentMessage(m_ServiceId + "/service_state/get");

if (!message.has_value())
throw std::runtime_error("The service doesn't have a state message.");

return state;
std::int8_t *state = (std::int8_t *) message.value().GetPayload().data;

return ServiceState(*state);
}

bool ServiceProxy::IsRunning()
Expand Down Expand Up @@ -338,9 +336,12 @@ void ServiceProxy::Terminate()
void ServiceProxy::Connect()
{
// Check if the service is running.
// Do an explicit check on the state stream to avoid infinite loop.
auto frame = m_State->GetLatestFrame();
ServiceState state = ServiceState(frame.AsArray<std::int8_t>()(0));
// Do an explicit check because we need the timestamp as well.
auto message = m_Testbed->GetMessageBroker()->GetCurrentMessage(m_ServiceId + "/service_state/get");
if (!message.has_value())
throw std::runtime_error("Service " + m_ServiceId + " didn't have a service state.");

ServiceState state = ServiceState(*((std::int8_t *)message.value().GetPayload().data));

if (state != ServiceState::RUNNING)
{
Expand All @@ -350,7 +351,7 @@ void ServiceProxy::Connect()
}

// Check if we are already connected to the Service.
if (m_TimeLastConnect == frame.m_TimeStamp)
if (m_TimeLastConnect == message.value().GetProducerTimestamp())
return;

// We need to reconnect, so let's disconnect first.
Expand Down Expand Up @@ -379,9 +380,7 @@ void ServiceProxy::Connect()
for (auto& [key, value] : info["datastream_ids"].items())
m_DataStreamIds[key] = value;

m_Heartbeat = DataStream::Open(info["heartbeat_stream_id"].get<std::string>());

m_TimeLastConnect = frame.m_TimeStamp;
m_TimeLastConnect = message.value().GetProducerTimestamp();
LOG_DEBUG("Connected to \"" + m_ServiceId + "\".");
}

Expand All @@ -392,8 +391,6 @@ void ServiceProxy::Disconnect()
m_CommandNames.clear();
m_DataStreamIds.clear();
m_DataStreams.clear();

m_Heartbeat = nullptr;
}

std::vector<std::string> ServiceProxy::GetPropertyNames(void (*error_check)())
Expand Down
4 changes: 0 additions & 4 deletions catkit_core/ServiceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class ServiceProxy

std::shared_ptr<DataStream> GetDataStream(const std::string &name, void (*error_check)() = nullptr);

std::shared_ptr<DataStream> GetHeartbeat();

ServiceState GetState();
bool IsRunning();
bool IsAlive();
Expand Down Expand Up @@ -62,8 +60,6 @@ class ServiceProxy

std::map<std::string, std::shared_ptr<DataStream>> m_DataStreams;

std::shared_ptr<DataStream> m_Heartbeat;
std::shared_ptr<DataStream> m_State;
std::uint64_t m_TimeLastConnect;
};

Expand Down
Loading
Loading