Skip to content

feat: wire codec.encode into all producers, add BatchCodecProto for batch-aware encoding#2850

Open
ce1ebrimbor wants to merge 19 commits into
ag2ai:mainfrom
ce1ebrimbor:feat/codec-encode-wiring
Open

feat: wire codec.encode into all producers, add BatchCodecProto for batch-aware encoding#2850
ce1ebrimbor wants to merge 19 commits into
ag2ai:mainfrom
ce1ebrimbor:feat/codec-encode-wiring

Conversation

@ce1ebrimbor
Copy link
Copy Markdown
Contributor

@ce1ebrimbor ce1ebrimbor commented May 1, 2026

Production-Side Codec Encode Wiring

PR3 in the parser/codec unification series (#2841 )

What

Wires codec.encode() into all 6 Producers, replacing direct encode_message() calls. Adds BatchCodecProto for batch-aware codecs. Custom codecs now work symmetrically on both consume and publish sides.

Changes

Codec on producers

  • codec: CodecProto attribute on ProducerProto and all 6 producer implementations
  • codec passed from broker config connect() to producer.connect()
  • DefaultCodec() used when no custom codec is set — zero behavior change

Encode wiring

  • 12 direct encode_message() calls replaced with await self.codec.encode() in Kafka, Confluent, NATS, MQTT producers
  • AioPikaParser.encode_message converted from sync staticmethod to async with codec parameter
  • MessageFormat.build/encode and BinaryMessageFormatV1.encode converted to async with codec parameter

Breaking Changes

  • build_message() in all broker testing modules is now async — callers must await it
  • AioPikaParser.encode_message() is now async — callers must await it
  • MessageFormat.build() and MessageFormat.encode() are now async

What Did NOT Change

  • encode_message() free function in faststream/message/utils.py
  • decode_message() and consumption-side codec wiring (PR2)
  • CodecProto.decode / DefaultCodec.decode
  • _get_parser_and_decoder() subscriber wiring

@github-actions github-actions Bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features MQTT Issues related to `faststream.mqtt` module labels May 1, 2026
Comment thread tests/brokers/base/codec.py
@ce1ebrimbor ce1ebrimbor changed the title Feat/codec encode wiring feat: wire codec.encode into all producers, add BatchCodecProto for batch-aware encoding May 1, 2026
msg = "Cannot use both 'codec' and 'decoder' — 'codec' replaces 'decoder'."
raise ValueError(msg)

is_batch = getattr(self._decoder, "__name__", "") == "decode_batch"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be eventually removed with the decoder= config

@ce1ebrimbor
Copy link
Copy Markdown
Contributor Author

@Lancetnik there are some changes in the internals, mainly the decoding and encoding are async now, (example: codecs for kafka may need to ping schema registry).

Let me know if this works for you.

Documentation is coming in a separate PR.
I suppose you would want to keep this experimental for a while ?

@Lancetnik
Copy link
Copy Markdown
Member

@ce1ebrimbor thank you! Looks good, but I need to dig into - there are some changes I want to look close
And sorry, have no time right now :( I'll try to review this weekends

@Lancetnik
Copy link
Copy Markdown
Member

Give me one more day, please

@ce1ebrimbor
Copy link
Copy Markdown
Contributor Author

Give me one more day, please

No pressure, take your time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module MQTT Issues related to `faststream.mqtt` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants