Skip to content

Service slots.#427

Draft
ehpor wants to merge 9 commits intodevelopfrom
feature/slots
Draft

Service slots.#427
ehpor wants to merge 9 commits intodevelopfrom
feature/slots

Conversation

@ehpor
Copy link
Copy Markdown
Collaborator

@ehpor ehpor commented Feb 27, 2026

Summary

This PR introduces Slots, a unified communication primitive that replaces the separate DataStreams and Properties APIs. Slots provide typed, bidirectional communication channels between services with built-in error handling, cancellation support, and confirmation mechanisms.

What are Slots?

Slots are named, typed communication channels that connect services. Unlike the previous API which had separate concepts for:

  • Properties (configuration values)
  • DataStreams (streaming data)

Slots unify these into a single concept:

  • Read-only slots: Service publishes data, Service proxies subscribe (like DataStreams)
  • Read-write slots: Service proxies can set values, service receives them via callbacks (like Properties)

The main difference with properties is that other people can subscribe to updates to that property and can retrieve the property value on demand without requiring the service to tell them what it is. The main difference with data streams is that slots allow for confirmation of sets. Additionally, data streams unidirectional (either service->outside or outside->service) without indicating which; slots are either uni- or bi-directional and has specific access patterns for each direction of communication.

Key Features

1. Type Safety

Slots are typed at creation time with three supported types:

  • Json - None, Integer, Float, Dictionaries, Lists or any combination of those
  • Raw - Binary bytes
  • Array - NumPy arrays

2. Synchronous with Confirmation

The set() method blocks until the service confirms the operation succeeded (or fails). This makes configuration changes reliable.

3. Asynchronous Option

The set_async() method returns immediately with a trace ID for tracking. Useful for high-frequency updates where you don't need immediate confirmation.

4. Cancellation

Long-running setter operations can be cancelled via a dedicated topic. The service can check context.is_cancelled() and abort gracefully. Currently, there is no way to call this functionality but I want to have the backend functionality in place for this.

5. Error Handling

Clear exceptions tell you exactly what went wrong:

  • SlotTimeoutError - Service didn't respond in time
  • SlotSetterError - Service's setter callback threw an exception
  • SlotCancelledError - Operation was cancelled

Usage Examples

Service Side (Python)

from catkit2 import Service
from catkit2.catkit_bindings import SlotDataType

class MyService(Service):
    def open(self):
        # Read-only slot - service publishes, proxies read
        self.temperature = self.make_json_slot('temperature')
        
        # Read-write slot with setter callback
        self.target_temperature = self.make_json_slot(
            'target_temperature',
            self.on_target_set
        )
        
        # Array slot for images
        self.image_slot = self.make_array_slot('camera_image')
    
    def on_target_set(self, target, context):
        """Called when proxy sets the target_temperature slot."""
        # Check if operation was cancelled
        if context.is_cancelled():
            raise RuntimeError("Operation cancelled")
        
        # Process the value
        self.target_temp = target
        
        # Confirm by publishing back to /get topic
        context.publish_json(value)
    
    def main(self):
        while not self.should_shut_down:
            # Publish temperature reading
            temp = read_sensor()
            self.temperature.publish(temp)
            
            # Publish camera frame
            frame = capture_image()
            self.image_slot.publish(frame)

Proxy Side (Python)

# Get slot from service
service = testbed.my_service
temperature_slot = service.temperature  # Automatic attribute access

# Read current value (non-blocking)
current_temp = service.temperature.get()  # Returns jsonable value

# Set target temperature (blocking with confirmation)
try:
    service.target_temperature = 25  # Uses set() internally
    # Or explicitly:
    # service.target_temperature.set(25, timeout=5.0)
except SlotTimeoutError:
    print("Service didn't respond in time")
except SlotSetterError as e:
    print(f"Service rejected the value: {e}")

# Async set - returns immediately
trace_id = service.target_temperature.set_async(25)
# Can use trace_id to track if needed

# Subscribe to streaming updates
subscription = service.temperature.subscribe()
while running:
    msg = subscription.get_next_message(timeout=1.0)
    if msg:
        data = msg.get_payload()
        print(f"Temperature: {data}")

API Reference

Service Methods

# Create read-only slots
make_json_slot(name) -> Slot
make_raw_slot(name) -> Slot  
make_array_slot(name) -> Slot

# Create read-write slots (with setter callback)
make_json_slot(name, setter_callback) -> Slot
make_raw_slot(name, setter_callback) -> Slot
make_array_slot(name, setter_callback) -> Slot

Slot Methods (Service Side)

slot.publish(value)  # Publish to all subscribers

SlotProxy Methods (Proxy Side)

slot.get()  # Get latest value (returns None if no data)
slot.set(value, timeout=5.0)  # Set with confirmation (blocking)
slot.set_async(value)  # Set without waiting (returns trace_id)
slot.subscribe(mode=NewestOnly)  # Subscribe to updates

SlotContext (Available in Setter Callbacks)

context.broker  # Access to message broker
context.trace_id  # Unique ID for this operation
context.is_cancelled()  # Check if operation was cancelled
context.publish_json(data)  # Confirm successful set
context.publish_raw(data)
context.publish_array(data)

Design Decisions

  1. Synchronous by deefault: The set() method blocks until confirmation. This is what most users expect for configuration operations. Use set_async() when you need fire-and-forget.
  2. Generic Python API: Python users just call get() and set() - the type handling is automatic based on the slot's configured type.
  3. Trace IDs for tracking: Every operation gets a unique trace ID. This enables:
    • Matching responses to requests
    • Cancellation of specific operations
    • Debugging and logging

Migration from Old API

Before (Properties):

# Service
self.make_property('gain', getter, setter)

# ServiceProxy
print(service.gain). # Waits for confirmation
service.gain = 10  # Waits for confirmation, raises on error

After (Slots):

# Service
self.make_json_slot('gain', self.on_set_gain)

# ServiceProxy
print(service.gain.get()). # Instant.
service.gain.set(10)  # Waits for confirmation, raises on error
# or
service.gain.set(10, timeout=5)  # Waits for confirmation for 5 seconds, raises on error
# or
service.gain = 10  # Waits for confirmation, raises on error

# Or async:
service.gain.set_async(10).  # Doesn't wait for confirmation, instant.

Before (DataStreams):

# Service
stream = self.make_data_stream('image', 'float64', [1024, 1024], 20)
stream.submit_data(image). # Instant

# ServiceProxy
frame = service.image.get_next_frame()

After (Slots):

# Service
slot = self.make_array_slot('image')
slot.publish(image). # Instant.

# Proxy
image = service.image.get()
# Or for streaming:
subscription = service.image.subscribe()

Testing

Comprehensive tests have been added in tests/test_service.py:

  • Read-write JSON slots with confirmation
  • Read-write raw bytes slots
  • Read-write array slots with NumPy
  • Read-only slot rejection
  • Subscription-based streaming
  • Error handling and timeouts

Future Work

Things to consider for the future:

  • Python futures for async operations
  • Batched slot updates
  • Slot metadata (units, ranges, descriptions)

@ehpor ehpor self-assigned this Feb 27, 2026
@ehpor ehpor added the enhancement New feature or request label Feb 27, 2026
@ehpor
Copy link
Copy Markdown
Collaborator Author

ehpor commented Feb 28, 2026

Question do we like having service.position = 23 as additional write-access, rather than only the direct service.position.set(23)? Since we have print(service.position.get()) as read-access but do not have print(service.position).

@ehpor
Copy link
Copy Markdown
Collaborator Author

ehpor commented Feb 28, 2026

@RemiSoummer @ivalaginja Your opinion on this new Service attribute type?

@ehpor
Copy link
Copy Markdown
Collaborator Author

ehpor commented Mar 1, 2026

I just added documentation as well. This includes a lot of documentation on how to write your own service, adapted and extended from @alexisyslau initial text.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant