Add NATS Transport Unit Tests, related documentation and prefix support for stream names.#2541
Add NATS Transport Unit Tests, related documentation and prefix support for stream names.#2541maxyloon wants to merge 33 commits into
Conversation
for more information, see https://pre-commit.ci
…rt for stream names.
for more information, see https://pre-commit.ci
|
thanks max |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2541 +/- ##
==========================================
+ Coverage 82.66% 82.95% +0.29%
==========================================
Files 79 80 +1
Lines 10231 10559 +328
Branches 1170 1203 +33
==========================================
+ Hits 8457 8759 +302
- Misses 1573 1597 +24
- Partials 201 203 +2 ☔ View full report in Codecov by Sentry. |
|
No problem. Let me know if there is anything else I can do to get it merged. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new NATS JetStream Kombu transport, along with supporting packaging/docs updates and a thorough unit test suite. It extends Kombu’s virtual transport layer to target JetStream streams/consumers (including configurable naming prefixes) and wires the transport into project tooling and documentation.
Changes:
- Added
kombu.transport.natsimplementing a JetStream-backed virtual transport with configurable stream/consumer name prefixes. - Added NATS unit + integration tests, plus tox/docker integration plumbing.
- Added packaging extras (
kombu[nats]) and documentation references/examples for the new transport.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
tox.ini |
Adds NATS integration env + docker service; installs NATS extra in certain tox envs. |
t/unit/transport/test_nats.py |
Adds unit tests for Message/QoS/Channel/Transport with mocked nats-py IO. |
t/integration/test_nats.py |
Adds integration tests (marked env('nats')) for basic transport operations. |
t/integration/docker/Dockerfile.nats |
Adds a JetStream-enabled NATS server container for tox integration runs. |
setup.py |
Adds nats extra to setup extras mapping. |
requirements/extras/nats.txt |
Defines nats-py[nkeys] extra requirement range. |
README.rst |
Adds NATS JetStream mention + transport comparison row. |
kombu/transport/nats.py |
New NATS JetStream transport implementation. |
kombu/transport/__init__.py |
Registers nats transport alias. |
examples/nats_send.py |
Adds example producer for NATS transport. |
examples/nats_receive.py |
Adds example consumer for NATS transport. |
docs/userguide/connections.rst |
Documents NATS URL examples + transport comparison entry. |
docs/reference/kombu.transport.nats.rst |
Adds API reference page for the NATS transport module. |
docs/reference/index.rst |
Adds NATS transport page to Sphinx reference index. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class QoS(virtual.QoS): | ||
| """Quality of Service guarantees.""" | ||
|
|
||
| _not_yet_acked = {} | ||
|
|
||
| def can_consume(self): | ||
| """Return true if the channel can be consumed from.""" | ||
| return not self.prefetch_count or len(self._not_yet_acked) < self.prefetch_count | ||
|
|
||
| def can_consume_max_estimate(self): | ||
| if self.prefetch_count: | ||
| return self.prefetch_count - len(self._not_yet_acked) | ||
| return 1 | ||
|
|
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
- QoS._not_yet_acked: moved from class attribute to instance __init__ to prevent shared state across channels - QoS.can_consume_max_estimate: clamp result at 0 via max(0, ...) to avoid returning negative values when _not_yet_acked exceeds prefetch_count - Channel._consumers → _js_consumers: rename to avoid collision with virtual.Channel._consumers which tracks Kombu consumer tags - Channel._open(): use 'hostname or DEFAULT_HOST' fallback so a None hostname doesn't produce 'nats://None:4222' - Channel.close(): remove the asyncio.all_tasks() loop cancellation that would cancel unrelated coroutines in the same event loop - Channel._delete(): always attempt stream deletion (not only when in cache); use _streams.discard() in a finally block so the cache stays consistent even on NotFoundError - Channel._put(): read message expiration property and pass it as a 'Nats-TTL' header (value in '<ms>ms' format) so NATS JetStream expires the message after the requested duration - tox.ini: fix nats deps selector to include pypy3.11-linux and 3.10-linux; remove undefined 3.14t-linux factor - docs/userguide/connections.rst: document all NATS transport options including per-message TTL
for more information, see https://pre-commit.ci
|
So I realized that the implementation uses embedds metadata in the body (like most virtual transports), but nats supports binary payloads and I wanted to make sure protocol binding for use with things like cloudevents would work with some configuration. Therefore I added a clean-body mode. So it uses headers instead for metadata and should avoid nested serialization problems
|
|
isn't it a complete new backend? why should we think about backward compatibility here then? |
| else: | ||
| body_bytes = b"" |
| """Get or create the global event loop.""" | ||
| global _event_loop | ||
| if _event_loop is None: | ||
| _event_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(_event_loop) |
There was a problem hiding this comment.
Good catch. The transport should not install a global asyncio event loop.
I see Kombu’s virtual transports are synchronous, so the NATS transport should behave as a sync adapter over nats-py’s async API. I’ll change this to keep a private event loop owned by each Channel, avoid asyncio.set_event_loop().
I think this keeps asyncio isolated from the rest of Celery/Kombu and avoids replacing an application’s existing thread-default loop.
| _event_loop: asyncio.AbstractEventLoop | None = None | ||
|
|
||
|
|
||
| def get_event_loop() -> asyncio.AbstractEventLoop: |
There was a problem hiding this comment.
will asyncio work well with current celery and kombu code base properly?
There was a problem hiding this comment.
Have a local branch that works now, will push it shortly. Taking some time verify some setups and sceanarios to make it fully celery compatible. Here is the docs from my working branch. Once I find it stable enough i'll open a PR for celery too.
Not sure exactly what you mean here, but I think the “backward compatibility” point here is less about compatibility with an existing released NATS backend, since this is new, and more about compatibility between the two wire formats introduced by this PR. And no I don't think its a complete new backend, just a question of wireformat which should be transient to the user. My thinking is that; The main thing we should preserve is consumer-side interoperability: a consumer should be able to read both formats regardless of whether the publisher used envelope-in-body mode or clean-body mode (might refactor it to raw for better abstraction). That matters if a queue already contains messages when nats_clean_body is changed, or if different producers are temporarily configured differently. So I agree the publish path can be strict and explicit, but the receive path should remain tolerant: detect Kombu metadata headers → clean-body mode; otherwise treat the payload as the full Kombu JSON envelope. External semantics outside the module will remain the same, but raw/clean mode with skip serializing and deserializing the wireformat, |
that is more clear now and make sense. |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
closes #2103
Implement support for NATS using its built-in persistence layer JetStream.
What's in this PR
Original implementation by @joeriddles. (PR: #2299) This PR builds on that work to bring it up to date with
mainand add the items needed for merge:t/unit/transport/test_nats.py) — 72 tests coveringMessage,QoS,Channel, andTransportwith no live NATS server requiredstream_name_prefixandconsumer_name_prefixtransport options (defaultSTREAM_/CONSUMER_)kombu[nats]extra pointing torequirements/extras/nats.txtkombu.transport.natsadded to the API reference indexmain;nats-py[nkeys]>=2.9.0,<3.0.0confirmed compatible with nats-py 2.14.0Testing
Install the extra and run unit tests (no server needed):
pip install "kombu[nats]" python -m pytest t/unit/transport/test_nats.py -vIntegration tests (requires a running NATS server with JetStream):
Example
You can test using either:
nats-server --jsdemo.nats.ioThe transport can be tested using
examples/nats_receive.pyandexamples/nats_send.py:python -m examples.nats_receive(append--demoif using the demo server)python -m examples.nats_send(append--demoif using the demo server)In the receive window, you should see something like: