[refactor][client] Introduce PulsarApiMessageId to access fields of MessageIdData#18890
Closed
BewareMyPower wants to merge 1 commit into
Closed
Conversation
ed8933d to
539b150
Compare
BewareMyPower
commented
Dec 12, 2022
| import org.testng.annotations.Test; | ||
|
|
||
| @Test(groups = "broker-api") | ||
| public class CustomMessageIdTest extends ProducerConsumerBase { |
Contributor
Author
There was a problem hiding this comment.
This test shows another benefit of this design. Now we can extend PulsarApiMessageId to create our own MessageId implementations for seek and acknowledge APIs. So the PulsarApiMessageId interface is very flexiable and scalable. Take this PR (apache/flink#21069) for example, we don't need to use the APIs from pulsar-client module to create a message id that represents a previous message id. /cc @tisonkun @syhily
Currently the `MessageId` interface hiddens all fields of the
`MessageIdData` struct defined in `PulsarApi.proto`. It's usually enough
for application users because they don't need to access the fields. But
for client developers and developers of other Pulsar ecosystems (e.g.
the built-in Kafka connector and the Flink connector in another repo),
the `MessageId` interface is too simple and there is no common used
abstraction. We can see many code usages like:
```java
if (msgId instanceof BatchMessageIdImpl) {
// Do type cast and then access fields like ledger id...
} else if (msgId instanceof MessageIdImpl) {
// Do type cast and then access fields like ledger id...
// NOTE: don't put this else if before the previous one because
// BatchMessageIdImpl is also a MessageIdImpl
} // ...
```
These `MessageId` implementations are used directly. It's a very bad
design because any change to the public APIs of these implementations
could bring breaking changes.
Also, there is a `TopicMessageIdImpl` that each time a
`getInnerMessageId()` method must be used to get the underlying
`MessageId` object, then do the type assertion and cast again. It makes
code unnecessarily complicated.
### Modifications
Introduce the `PulsarApiMessageId` interface into the `pulsar-common`
module. All `MessageId` implementations so far (except `MultiMessageId`)
should extend this interface so we can do the following conversion
safely in client code or other modules:
```java
long ledgerId = ((PulsarApiMessageId) msgId).getLedgerId();
```
Regarding the `ack_set` field, use a `BitSet` instead of the
`BatchMessageAcker` to record if a message in the batch is acknowledged.
Since the `TopicMessageId` is just a proxy of other `MessageId`
implementations, it's stored as key or value in the map directly because
the `compareTo`/`equal`/`hashCode` methods have the same semantics with
the underlying `MessageId`. There is no need to cast the type and call
`getInnerMessageId`.
Remove all other usages and mark the public methods as deprecated to
avoid breaking changes. They could be removed in the next major release.
Add a `CustomMessageIdTest` to verify any valid `MessageId` implementation
works for `seek` and `acknowledge` APIs.
539b150 to
4029416
Compare
Contributor
Author
|
I will open a PIP first. |
Contributor
Author
|
Close this PR and continue the work after PIP-229 is approved. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Currently the
MessageIdinterface hiddens all fields of theMessageIdDatastruct defined inPulsarApi.proto. It's usually enough for application users because they don't need to access the fields. But for client developers and developers of other Pulsar ecosystems (e.g. the built-in Kafka connector and the Flink connector in another repo), theMessageIdinterface is too simple and there is no common used abstraction. We can see many code usages like:These
MessageIdimplementations are used directly. It's a very bad design because any change to the public APIs of these implementations could bring breaking changes.Also, there is a
TopicMessageIdImplthat each time agetInnerMessageId()method must be used to get the underlyingMessageIdobject, then do the type assertion and cast again. It makes code unnecessarily complicated.Modifications
Introduce the
PulsarApiMessageIdinterface into thepulsar-commonmodule. AllMessageIdimplementations so far (exceptMultiMessageId) should extend this interface so we can do the following conversion safely in client code or other modules:Regarding the
ack_setfield, use aBitSetinstead of theBatchMessageAckerto record if a message in the batch is acknowledged.Since the
TopicMessageIdis just a proxy of otherMessageIdimplementations, it's stored as key or value in the map directly because thecompareTo/equal/hashCodemethods have the same semantics with the underlyingMessageId. There is no need to cast the type and callgetInnerMessageId.Remove all other usages and mark the public methods as deprecated to avoid breaking changes. They could be removed in the next major release.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: BewareMyPower#11