Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}

Check failure on line 543 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

View workflow job for this annotation

GitHub Actions / CI - Integration - Shade on Java 21

SimpleProducerConsumerTest.testEncryptionConsumerWithoutCryptoReader

this method should never be called

Check failure on line 543 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

View workflow job for this annotation

GitHub Actions / CI - Integration - Shade on Java 24

SimpleProducerConsumerTest.testEncryptionConsumerWithoutCryptoReader

this method should never be called

Check failure on line 543 in pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

View workflow job for this annotation

GitHub Actions / CI - Integration - Shade on Java 17

SimpleProducerConsumerTest.testEncryptionConsumerWithoutCryptoReader

this method should never be called

log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
Expand Down Expand Up @@ -3416,22 +3416,8 @@
MessageCrypto<MessageMetadata, MessageMetadata> crypto =
new MessageCryptoBc("test", false);

MessageMetadata messageMetadata = new MessageMetadata()
.setEncryptionParam(encrParam)
.setProducerName("test")
.setSequenceId(123)
.setPublishTime(12333453454L)
.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType))
.setUncompressedSize(uncompressedSize);
messageMetadata.addEncryptionKey()
.setKey(encryptionKeyName)
.setValue(dataKey);
if (encAlgo != null) {
messageMetadata.setEncryptionAlgo(encAlgo);
}

ByteBuffer decryptedPayload = ByteBuffer.allocate(crypto.getMaxOutputSize(payloadBuf.remaining()));
crypto.decrypt(() -> messageMetadata, payloadBuf, decryptedPayload, reader);
crypto.decrypt(msg.getEncryptionCtx().orElseThrow(), payloadBuf, decryptedPayload, reader);

// try to uncompress
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testCompactedOutMessages() throws Exception {
// shove it in the sideways
consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null,
batchBuffer, new MessageIdData().setLedgerId(1234).setEntryId(567),
consumer.cnx(), DEFAULT_CONSUMER_EPOCH, false);
consumer.cnx(), DEFAULT_CONSUMER_EPOCH, Optional.empty());
Message<?> m = consumer.receive();
assertEquals(((BatchMessageIdImpl) m.getMessageId()).getLedgerId(), 1234);
assertEquals(((BatchMessageIdImpl) m.getMessageId()).getEntryId(), 567);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand All @@ -32,6 +33,9 @@
@InterfaceStability.Stable
public interface MessageCrypto<MetadataT, BuilderT> {

int DECRYPT_V0 = 0;
int DECRYPT_V1 = 1;

int IV_LEN = 12;

/*
Expand Down Expand Up @@ -92,6 +96,38 @@ void encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
*
* @return true if success, false otherwise
*/

/**
* Implement {@link MessageCrypto#decrypt(EncryptionContext, ByteBuffer, ByteBuffer, CryptoKeyReader)} instead.
* This method is retained only for backward compatibility for existing custom implementations.
*/
@Deprecated
boolean decrypt(Supplier<MetadataT> messageMetadataSupplier, ByteBuffer payload,
ByteBuffer outBuffer, CryptoKeyReader keyReader);

/**
* @return the version of the `decrypt` method to call
* V0: call {@link MessageCrypto#decrypt(Supplier, ByteBuffer, ByteBuffer, CryptoKeyReader)}
* V1: call {@link MessageCrypto#decrypt(EncryptionContext, ByteBuffer, ByteBuffer, CryptoKeyReader)}
* others: the consumer will fail to decrypt
*/
default int decryptApiVersion() {
return DECRYPT_V0;
}

/**
* Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
*
* @param context the encryption context
* @param payload Message which needs to be decrypted
* @param keyReader KeyReader implementation to retrieve key value
* @param outBuffer the buffer where to write the encrypted payload. The buffer needs to be have enough space
* to hold the encrypted value. Use #getMaxOutputSize method to discover the max size.
*
* @return true if success, false otherwise
*/
default boolean decrypt(EncryptionContext context, ByteBuffer payload, ByteBuffer outBuffer,
CryptoKeyReader keyReader) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import java.security.Security;
import java.security.spec.AlgorithmParameterSpec;
import java.security.spec.InvalidKeySpecException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -60,8 +58,8 @@
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
Expand Down Expand Up @@ -474,13 +472,14 @@ public synchronized void encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
}
}

private boolean decryptDataKey(String keyName, byte[] encryptedDataKey, List<KeyValue> encKeyMeta,
CryptoKeyReader keyReader) {
@Override
public boolean decrypt(Supplier<MessageMetadata> messageMetadataSupplier, ByteBuffer payload, ByteBuffer outBuffer,
CryptoKeyReader keyReader) {
throw new IllegalStateException("this method should never be called");
}

Map<String, String> keyMeta = new HashMap<String, String>();
encKeyMeta.forEach(kv -> {
keyMeta.put(kv.getKey(), kv.getValue());
});
private boolean decryptDataKey(String keyName, byte[] encryptedDataKey, Map<String, String> keyMeta,
CryptoKeyReader keyReader) {

// Read the private key info using callback
EncryptionKeyInfo keyInfo = keyReader.getPrivateKey(keyName, keyMeta);
Expand Down Expand Up @@ -533,11 +532,11 @@ private boolean decryptDataKey(String keyName, byte[] encryptedDataKey, List<Key
return true;
}

private boolean decryptData(SecretKey dataKeySecret, MessageMetadata msgMetadata,
private boolean decryptData(SecretKey dataKeySecret, EncryptionContext context,
ByteBuffer payload, ByteBuffer targetBuffer) {

// unpack iv and encrypted data
iv = msgMetadata.getEncryptionParam();
iv = context.getParam();

GCMParameterSpec gcmParams = new GCMParameterSpec(tagLen, iv);
try {
Expand All @@ -563,21 +562,19 @@ public int getMaxOutputSize(int inputLen) {
return inputLen + Math.max(inputLen, 512);
}

private boolean getKeyAndDecryptData(MessageMetadata msgMetadata, ByteBuffer payload, ByteBuffer targetBuffer) {
List<EncryptionKeys> encKeys = msgMetadata.getEncryptionKeysList();

private boolean getKeyAndDecryptData(EncryptionContext context, ByteBuffer payload, ByteBuffer targetBuffer) {
// Go through all keys to retrieve data key from cache
for (int i = 0; i < encKeys.size(); i++) {
for (final var entry : context.getKeys().entrySet()) {

byte[] msgDataKey = encKeys.get(i).getValue();
byte[] msgDataKey = entry.getValue().getKeyValue();
byte[] keyDigest = digest.digest(msgDataKey);
SecretKey storedSecretKey = dataKeyCache.getIfPresent(ByteBuffer.wrap(keyDigest));
if (storedSecretKey != null) {

// Taking a small performance hit here if the hash collides. When it
// returns a different key, decryption fails. At this point, we would
// call decryptDataKey to refresh the cache and come here again to decrypt.
if (decryptData(storedSecretKey, msgMetadata, payload, targetBuffer)) {
if (decryptData(storedSecretKey, context, payload, targetBuffer)) {
// If decryption succeeded, we can already return
return true;
}
Expand All @@ -591,45 +588,34 @@ private boolean getKeyAndDecryptData(MessageMetadata msgMetadata, ByteBuffer pay
return false;
}

/*
* Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
*
* @param msgMetadata Message Metadata
*
* @param payload Message which needs to be decrypted
*
* @param keyReader KeyReader implementation to retrieve key value
*
* @return true if success, false otherwise
*/
@Override
public boolean decrypt(Supplier<MessageMetadata> messageMetadataSupplier,
ByteBuffer payload, ByteBuffer outBuffer, CryptoKeyReader keyReader) {
public int decryptApiVersion() {
return DECRYPT_V1;
}

@Override
public boolean decrypt(EncryptionContext context, ByteBuffer payload, ByteBuffer outBuffer,
CryptoKeyReader keyReader) {

MessageMetadata msgMetadata = messageMetadataSupplier.get();
// If dataKey is present, attempt to decrypt using the existing key
if (dataKey != null) {
if (getKeyAndDecryptData(msgMetadata, payload, outBuffer)) {
if (getKeyAndDecryptData(context, payload, outBuffer)) {
return true;
}
}

// dataKey is null or decryption failed. Attempt to regenerate data key
List<EncryptionKeys> encKeys = msgMetadata.getEncryptionKeysList();
EncryptionKeys encKeyInfo = encKeys.stream().filter(kbv -> {

byte[] encDataKey = kbv.getValue();
List<KeyValue> encKeyMeta = kbv.getMetadatasList();
return decryptDataKey(kbv.getKey(), encDataKey, encKeyMeta, keyReader);

final var encKeyInfo = context.getKeys().entrySet().stream().filter(entry -> {
byte[] encDataKey = entry.getValue().getKeyValue();
return decryptDataKey(entry.getKey(), encDataKey, entry.getValue().getMetadata(), keyReader);
}).findFirst().orElse(null);

if (encKeyInfo == null || dataKey == null) {
// Unable to decrypt data key
return false;
}

return getKeyAndDecryptData(msgMetadata, payload, outBuffer);
return getKeyAndDecryptData(context, payload, outBuffer);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
final BitSet ackSetInMessageId,
final int redeliveryCount,
final long consumerEpoch,
final boolean isEncrypted) {
final Optional<EncryptionContext> encryptionContext) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
}
Expand Down Expand Up @@ -1330,7 +1330,7 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, payloadBuffer,
createEncryptionContext(msgMetadata, isEncrypted), cnx(), schema, redeliveryCount,
encryptionContext, cnx(), schema, redeliveryCount,
poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
Expand All @@ -1351,7 +1351,7 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
final int redeliveryCount,
final long consumerEpoch) {
return newMessage(messageId, brokerEntryMetadata, messageMetadata, payload, schema, redeliveryCount,
consumerEpoch, false);
consumerEpoch, createEncryptionContext(messageMetadata));
}

protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
Expand All @@ -1361,10 +1361,9 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
final Schema<V> schema,
final int redeliveryCount,
final long consumerEpoch,
final boolean isEncrypted) {
final Optional<EncryptionContext> encryptionContext) {
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
createEncryptionContext(messageMetadata, isEncrypted), cnx(), schema, redeliveryCount,
poolMessages, consumerEpoch);
encryptionContext, cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
}
Expand Down Expand Up @@ -1476,12 +1475,15 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
return;
}

final var encryptionContext = createEncryptionContext(msgMetadata);
DecryptResult decryptResult = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
cnx);
cnx, encryptionContext);

if (decryptResult.shouldDiscard()) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
} else if (encryptionContext.isPresent() && !decryptResult.success) {
encryptionContext.get().setEncrypted(true);
}

boolean isMessageUndecryptable = !decryptResult.success;
Expand Down Expand Up @@ -1548,7 +1550,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien

final MessageImpl<T> message =
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload,
schema, redeliveryCount, consumerEpoch, isMessageUndecryptable);
schema, redeliveryCount, consumerEpoch, encryptionContext);
uncompressedPayload.release();

if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
Expand All @@ -1568,7 +1570,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet,
uncompressedPayload, messageId, cnx, consumerEpoch, isMessageUndecryptable);
uncompressedPayload, messageId, cnx, consumerEpoch, encryptionContext);

uncompressedPayload.release();
}
Expand Down Expand Up @@ -1769,7 +1771,7 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx, long consumerEpoch,
boolean isEncrypted) {
Optional<EncryptionContext> encryptionContext) {
int batchSize = msgMetadata.getNumMessagesInBatch();

// create ack tracker for entry aka batch
Expand All @@ -1792,7 +1794,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch, isEncrypted);
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch, encryptionContext);
if (message == null) {
// If it is not in ackBitSet, it means Broker does not want to deliver it to the client, and
// did not decrease the permits in the broker-side.
Expand Down Expand Up @@ -2017,9 +2019,10 @@ public static DecryptResult discard() {

private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount,
MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx currentCnx) {
ByteBuf payload, ClientCnx currentCnx,
Optional<EncryptionContext> encryptionContext) {

if (msgMetadata.getEncryptionKeysCount() == 0) {
if (encryptionContext.isEmpty() || encryptionContext.get().getKeys().isEmpty()) {
return DecryptResult.success(payload.retain());
}
int batchSize = msgMetadata.getNumMessagesInBatch();
Expand All @@ -2031,7 +2034,14 @@ private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeli
int maxDecryptedSize = msgCrypto.getMaxOutputSize(payload.readableBytes());
ByteBuf decryptedData = PulsarByteBufAllocator.DEFAULT.buffer(maxDecryptedSize);
ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, maxDecryptedSize);
if (msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData, conf.getCryptoKeyReader())) {
final var decryptResult = switch (msgCrypto.decryptApiVersion()) {
case MessageCrypto.DECRYPT_V0 -> msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData,
conf.getCryptoKeyReader());
case MessageCrypto.DECRYPT_V1 -> msgCrypto.decrypt(encryptionContext.get(), payload.nioBuffer(),
nioDecryptedData, conf.getCryptoKeyReader());
default -> false;
};
if (decryptResult) {
decryptedData.writerIndex(nioDecryptedData.limit());
return DecryptResult.success(decryptedData);
}
Expand Down Expand Up @@ -2932,11 +2942,9 @@ private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
* Create EncryptionContext if message payload is encrypted.
*
* @param msgMetadata
* @param isEncrypted
* @return {@link Optional}<{@link EncryptionContext}>
*/
private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata,
boolean isEncrypted) {
static Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata) {

EncryptionContext encryptionCtx = null;
if (msgMetadata.getEncryptionKeysCount() > 0) {
Expand All @@ -2959,7 +2967,7 @@ private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgM
.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(msgMetadata.getCompression()));
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
encryptionCtx.setBatchSize(batchSize);
encryptionCtx.setEncrypted(isEncrypted);
encryptionCtx.setEncrypted(false);
}
return Optional.ofNullable(encryptionCtx);
}
Expand Down
Loading
Loading