Skip to content

✨Periodically inform message broker that message is still in progress#248

Merged
aleksul merged 2 commits into
mainfrom
keepalive
May 12, 2026
Merged

✨Periodically inform message broker that message is still in progress#248
aleksul merged 2 commits into
mainfrom
keepalive

Conversation

@aleksul
Copy link
Copy Markdown
Owner

@aleksul aleksul commented May 12, 2026

Change Summary

Adds keepalive mechanism, which periodically informs message broker that message is still in progress.

Checklist

  • Unit tests for the changes exist
  • Tests pass on CI and coverage remains at 100%
  • Documentation reflects the changes where applicable
  • My PR is ready to review

@github-actions
Copy link
Copy Markdown

Coverage Report

Name Stmts Miss Cover Missing
TOTAL 8029 0 100%

103 files skipped due to complete coverage.

@aleksul aleksul merged commit 6f0723f into main May 12, 2026
15 checks passed
@aleksul aleksul deleted the keepalive branch May 12, 2026 22:55
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a keep-alive mechanism for long-running actor executions so Repid can periodically signal supported brokers that a message is still being processed, reducing the chance of premature redelivery/timeouts.

Changes:

  • Adds a keep-alive loop in the runner and a per-actor keep_alive configuration (enable/disable or override interval).
  • Extends ReceivedMessageT and CapabilitiesT to support keep-alive across brokers, and implements broker-specific keep-alive behavior for Redis, Pub/Sub, NATS, and SQS.
  • Adds/updates unit + integration tests and updates the user guide documentation.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/unit/test_runner.py Adds unit tests covering the runner keep-alive loop and keep-alive execution wrapper.
tests/unit/test_router.py Adds propagation test for router-level keep_alive configuration.
tests/unit/redis/test_redis_keep_alive.py Adds Redis keep-alive unit coverage for RedisReceivedMessage.
tests/unit/pubsub/protocol/test_received_message.py Adds Pub/Sub keep-alive interval and behavior tests.
tests/integration/test_sqs_specific.py Adds SQS keep-alive and related edge-case integration tests.
tests/integration/test_nats_specific.py Adds NATS keep-alive tests and subscriber consumer-info coverage.
tests/integration/test_kafka_specific.py Updates capabilities expectation to include supports_keep_alive.
repid/test_client.py Implements keep-alive stubs + capability flag for the in-process test server/message.
repid/router.py Adds keep_alive configuration to Router/actor definitions and propagates via include_router.
repid/data/actor.py Adds keep_alive to ActorData so the runner can apply per-actor behavior.
repid/connections/sqs/subscriber.py Passes configured visibility timeout into SqsReceivedMessage instances.
repid/connections/sqs/message.py Implements SQS message keep-alive via ChangeMessageVisibility + adds keep_alive_interval.
repid/connections/sqs/message_broker.py Adds visibility_timeout config + advertises supports_keep_alive.
repid/connections/redis/message_broker.py Implements Redis message keep-alive via XCLAIM + adds keep-alive interval support + capability flag.
repid/connections/pubsub/protocol/subscriber.py Passes stream ack-deadline config into received message wrapper.
repid/connections/pubsub/protocol/received_message.py Implements Pub/Sub message keep-alive via extend_deadline + exposes keep-alive interval.
repid/connections/pubsub/message_broker.py Advertises supports_keep_alive for Pub/Sub server.
repid/connections/nats/message_broker.py Implements NATS keep-alive via in_progress() + derives keep-alive interval from ack-wait + capability flag.
repid/connections/kafka/message.py Adds keep-alive stubs + interval property for Kafka messages (unsupported).
repid/connections/kafka/message_broker.py Advertises supports_keep_alive: False for Kafka server.
repid/connections/in_memory/message_broker.py Adds keep-alive stubs + capability flag for in-memory server/messages.
repid/connections/amqp/message_broker.py Advertises supports_keep_alive: False for AMQP server.
repid/connections/amqp/helpers.py Adds keep-alive stubs + interval property for AMQP received message wrapper.
repid/connections/abc.py Extends protocols/typed dicts with keep-alive APIs and capability flag.
repid/_runner.py Adds keep-alive loop and wraps actor execution to run it concurrently when supported/enabled.
docs/user_guide/actors/execution.md Documents keep-alive behavior and per-actor configuration examples.
Comments suppressed due to low confidence (1)

repid/_runner.py:70

  • interval can be 0 (e.g., from a broker-recommended keep_alive_interval computed via integer division) which makes _keep_alive_loop spin with asyncio.sleep(0) and repeatedly call keep_alive(), causing high CPU usage and log spam. Please clamp/validate the interval (e.g., require > 0 and/or use a small minimum like 0.1s/1s) before starting the keepalive task.
    interval = (
        actor.keep_alive
        if isinstance(actor.keep_alive, (int, float)) and not isinstance(actor.keep_alive, bool)
        else message.keep_alive_interval
    )
    if not server.capabilities["supports_keep_alive"] or interval is None:
        return await _actor_execution(message, actor, server, default_serializer)

    keepalive_task = asyncio.create_task(_keep_alive_loop(message, interval))
    try:

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread repid/_runner.py
if isinstance(actor.keep_alive, (int, float)) and not isinstance(actor.keep_alive, bool)
else message.keep_alive_interval
)
if not server.capabilities["supports_keep_alive"] or interval is None:
Comment thread repid/_runner.py
Comment on lines +48 to +49
except Exception: # noqa: BLE001
logger.warning("message.keep_alive.error", extra={"message_id": message.message_id})
self._msg = msg
self._action: MessageAction | None = None
self._visibility_timeout = visibility_timeout
self._keep_alive_interval: int = visibility_timeout // 3
Comment on lines +47 to +48
self._keep_alive_interval: int | None = (
int(ack_wait) // 3 if ack_wait is not None and ack_wait > 0 else None
Comment on lines +43 to 45
self._stream_ack_deadline_seconds = stream_ack_deadline_seconds
self._keep_alive_interval: int = stream_ack_deadline_seconds // 3

self._server = server
self._action: MessageAction | None = None
self._consumer_name = consumer_name
self._keep_alive_interval: int | None = min_idle_ms // 3000 if min_idle_ms > 0 else None
Comment on lines +98 to +104
SqsReceivedMessage(
self._server,
channel,
queue_url,
msg,
self._server._visibility_timeout,
).reject(),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants