Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 261 additions & 1 deletion catkit2/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "Uuid.h"
#include "ArrayView.h"
#include "ProcessStats.h"
#include "Slot.h"
#include "SlotProxy.h"

#include "testbed.pb.h"

Expand Down Expand Up @@ -607,6 +609,45 @@ PYBIND11_MODULE(catkit_bindings, m)
return service.MakeDataStream(stream_name, dtype, dimensions, num_frames_in_buffer);
})
.def("reuse_data_stream", &Service::ReuseDataStream)
.def("make_json_slot", [](Service &service, std::string slot_name, py::object setter)
{
if (setter.is_none())
{
return service.MakeJsonSlot(slot_name);
}
return service.MakeJsonSlot(slot_name,
[setter](const nlohmann::json &value, SlotContext &ctx)
{
py::gil_scoped_acquire acquire;
py::object result = setter(value, ctx);
});
}, py::arg("slot_name"), py::arg("setter") = py::none())
.def("make_raw_slot", [](Service &service, std::string slot_name, py::object setter)
{
if (setter.is_none())
{
return service.MakeRawSlot(slot_name);
}
return service.MakeRawSlot(slot_name,
[setter](std::string_view value, SlotContext &ctx)
{
py::gil_scoped_acquire acquire;
py::object result = setter(py::bytes(value.data(), value.size()), ctx);
});
}, py::arg("slot_name"), py::arg("setter") = py::none())
.def("make_array_slot", [](Service &service, std::string slot_name, py::object setter)
{
if (setter.is_none())
{
return service.MakeArraySlot(slot_name);
}
return service.MakeArraySlot(slot_name,
[setter](ArrayView value, SlotContext &ctx)
{
py::gil_scoped_acquire acquire;
py::object result = setter(ToPython(value), ctx);
});
}, py::arg("slot_name"), py::arg("setter") = py::none())
.def("cleanup_attributes", &Service::CleanupAttributes);

py::enum_<ServiceState>(m, "ServiceState")
Expand Down Expand Up @@ -664,9 +705,17 @@ PYBIND11_MODULE(catkit_bindings, m)
{
return service.GetDataStreamNames(error_check_python);
})
.def_property_readonly("slot_names", [](ServiceProxy &service)
{
return service.GetSlotNames(error_check_python);
})
.def_property_readonly("config", &ServiceProxy::GetConfig)
.def_property_readonly("id", &ServiceProxy::GetId)
.def_property_readonly("testbed", &ServiceProxy::GetTestbed);
.def_property_readonly("testbed", &ServiceProxy::GetTestbed)
.def("get_slot", [](ServiceProxy &proxy, std::string name)
{
return proxy.GetSlot(name, error_check_python);
});

py::class_<TestbedProxy, std::shared_ptr<TestbedProxy>>(m, "TestbedProxy")
.def(py::init<std::string, int>())
Expand Down Expand Up @@ -1292,6 +1341,217 @@ PYBIND11_MODULE(catkit_bindings, m)
.def_property_readonly("memory_usage", &ProcessStats::GetMemoryUsage)
.def_property_readonly("cpu_usage", &ProcessStats::GetCpuUsage);

// Slot bindings
py::enum_<SlotDataType>(m, "SlotDataType")
.value("Json", SlotDataType::Json)
.value("Raw", SlotDataType::Raw)
.value("Array", SlotDataType::Array);

py::class_<SlotContext>(m, "SlotContext")
.def("is_cancelled", &SlotContext::IsCancelled)
.def("publish_json", [](SlotContext &ctx, nlohmann::json data)
{
ctx.Publish(data);
})
.def("publish_raw", [](SlotContext &ctx, py::bytes data)
{
ctx.Publish(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr())));
})
.def("publish_array", [](SlotContext &ctx, py::array array)
{
ArrayInfo info;
auto dtype = array.dtype();
info.data_type = dtype.kind();
info.item_size = dtype.itemsize();
info.byte_order = dtype.byteorder();
info.ndim = array.ndim();
for (size_t i = 0; i < info.ndim; ++i)
{
info.shape[i] = array.shape()[i];
info.strides[i] = array.strides()[i];
}
ArrayView view{info, array.mutable_data()};
ctx.Publish(view);
})
.def_property_readonly("broker", &SlotContext::GetBroker)
.def_property_readonly("trace_id", &SlotContext::GetTraceId);

py::class_<Slot, std::shared_ptr<Slot>>(m, "Slot")
.def_property_readonly("is_read_only", &Slot::IsReadOnly)
.def_property_readonly("data_type", &Slot::GetDataType)
.def_property_readonly("name", &Slot::GetName)
.def("start", &Slot::Start)
.def("stop", &Slot::Stop)
.def("publish", [](Slot &slot, py::object value)
{
switch (slot.GetDataType())
{
case SlotDataType::Json:
slot.Publish(py::cast<nlohmann::json>(value));
break;
case SlotDataType::Raw:
{
py::bytes data = py::cast<py::bytes>(value);
slot.Publish(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr())));
break;
}
case SlotDataType::Array:
{
py::array array = py::cast<py::array>(value);
ArrayInfo info;
auto dtype = array.dtype();
info.data_type = dtype.kind();
info.item_size = dtype.itemsize();
info.byte_order = dtype.byteorder();

if (array.ndim() > MAX_NUM_DIMENSIONS)
throw std::runtime_error("Array dimension is too large.");

info.ndim = array.ndim();

for (size_t i = 0; i < info.ndim; ++i)
{
info.shape[i] = array.shape()[i];
info.strides[i] = array.strides()[i];
}

if (!info.IsCContiguous() && !info.IsFContiguous())
throw std::runtime_error("Array has to be either C or F contiguous.");

const ArrayView array_view{info, array.mutable_data()};
slot.Publish(array_view);
break;
}
default:
throw std::runtime_error("Unknown slot data type");
}
});

py::class_<SlotProxy, std::shared_ptr<SlotProxy>>(m, "SlotProxy")
.def("subscribe", &SlotProxy::Subscribe, py::arg("mode") = MessageSubscriptionMode::NewestOnly)
.def_property_readonly("is_read_only", &SlotProxy::IsReadOnly)
.def_property_readonly("data_type", &SlotProxy::GetDataType)
.def_property_readonly("slot_name", &SlotProxy::GetSlotName)
.def("get", [](SlotProxy &proxy) -> py::object
{
try
{
switch (proxy.GetDataType())
{
case SlotDataType::Json:
return proxy.GetJson();
case SlotDataType::Raw:
{
auto raw = proxy.GetRaw();
return py::bytes(raw.data(), raw.size());
}
case SlotDataType::Array:
return ToPython(proxy.GetArray());
default:
throw std::runtime_error("Unknown slot data type");
}
}
catch (const std::runtime_error &)
{
return py::none();
}
})
.def("set", [](SlotProxy &proxy, py::object value, double timeout)
{
if (proxy.IsReadOnly())
{
throw std::runtime_error("Cannot set read-only slot");
}

switch (proxy.GetDataType())
{
case SlotDataType::Json:
proxy.Set(py::cast<nlohmann::json>(value), timeout);
break;
case SlotDataType::Raw:
{
py::bytes data = py::cast<py::bytes>(value);
proxy.Set(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr())), timeout);
break;
}
case SlotDataType::Array:
{
py::array array = py::cast<py::array>(value);
ArrayInfo info;
auto dtype = array.dtype();
info.data_type = dtype.kind();
info.item_size = dtype.itemsize();
info.byte_order = dtype.byteorder();

if (array.ndim() > MAX_NUM_DIMENSIONS)
throw std::runtime_error("Array dimension is too large.");

info.ndim = array.ndim();

for (size_t i = 0; i < info.ndim; ++i)
{
info.shape[i] = array.shape()[i];
info.strides[i] = array.strides()[i];
}

if (!info.IsCContiguous() && !info.IsFContiguous())
throw std::runtime_error("Array has to be either C or F contiguous.");

const ArrayView array_view{info, array.mutable_data()};
proxy.Set(array_view, timeout);
break;
}
default:
throw std::runtime_error("Unknown slot data type");
}
}, py::arg("value"), py::arg("timeout") = 5.0)
.def("set_async", [](SlotProxy &proxy, py::object value) -> Uuid
{
if (proxy.IsReadOnly())
{
throw std::runtime_error("Cannot set read-only slot");
}

switch (proxy.GetDataType())
{
case SlotDataType::Json:
return proxy.SetAsync(py::cast<nlohmann::json>(value));
case SlotDataType::Raw:
{
py::bytes data = py::cast<py::bytes>(value);
return proxy.SetAsync(std::string_view(PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr())));
}
case SlotDataType::Array:
{
py::array array = py::cast<py::array>(value);
ArrayInfo info;
auto dtype = array.dtype();
info.data_type = dtype.kind();
info.item_size = dtype.itemsize();
info.byte_order = dtype.byteorder();

if (array.ndim() > MAX_NUM_DIMENSIONS)
throw std::runtime_error("Array dimension is too large.");

info.ndim = array.ndim();

for (size_t i = 0; i < info.ndim; ++i)
{
info.shape[i] = array.shape()[i];
info.strides[i] = array.strides()[i];
}

if (!info.IsCContiguous() && !info.IsFContiguous())
throw std::runtime_error("Array has to be either C or F contiguous.");

const ArrayView array_view{info, array.mutable_data()};
return proxy.SetAsync(array_view);
}
default:
throw std::runtime_error("Unknown slot data type");
}
});

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
Expand Down
15 changes: 11 additions & 4 deletions catkit2/testbed/service_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def testbed(self):
return self._testbed

def __getattr__(self, name):
'''Get a property, command or data stream.
'''Get a property, command, data stream, or slot.

Properties
----------
Expand All @@ -40,13 +40,13 @@ def __getattr__(self, name):

Returns
-------
Property or Command or DataStream object
Property or Command or DataStream or SlotProxy object
The attribute.

Raises
------
AttributeError
If the named attribute is not a property, command or data stream.
If the named attribute is not a property, command, data stream, or slot.
'''
if name in self.property_names:
# Return property.
Expand All @@ -59,11 +59,14 @@ def cmd(**kwargs):
elif name in self.data_stream_names:
# Return datastream.
return self.get_data_stream(name)
elif name in self.slot_names:
# Return slot.
return self.get_slot(name)
else:
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'.")

def __setattr__(self, name, value):
'''Set the property.
'''Set the property or slot.

Parameters
----------
Expand All @@ -76,6 +79,7 @@ def __setattr__(self, name, value):
------
AttributeError
If the attribute is a command or datastream (both of which are not settable).
If the attribute is a read-only slot.
If the attribute cannot be found on this service.
'''
if name in self.property_names:
Expand All @@ -85,6 +89,9 @@ def __setattr__(self, name, value):
raise AttributeError('Cannot set a command.')
elif name in self.data_stream_names:
raise AttributeError('Cannot set a data stream. Did you mean .submit_data()?')
elif name in self.slot_names:
# Set slot value if not read-only.
self.get_slot(name).set(value)
else:
super().__setattr__(name, value)

Expand Down
Loading
Loading