Skip to content

Commit

Permalink
feat(encryptionCheck): introduce new method DecryptionService#hasSame…
Browse files Browse the repository at this point in the history
…EncryptionFlag(..) so kafka consumers can validate the kafka message producer
  • Loading branch information
andrekaplick5678 committed Feb 20, 2024
1 parent 1dfdd2b commit 39e05c2
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/main/java/de/otto/kafka/messaging/e2ee/DecryptionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ public String decryptToString(String kafkaTopicName, AesEncryptedPayload encrypt
return new String(decryptToByteArray(kafkaTopicName, encryptedPayload), StandardCharsets.UTF_8);
}

/**
* This method checks if the encryption "flag" of the kafka topic matches the payload. This method
* can be used by a kafka topic consumer the control the kafka producer.
*
* @param kafkaTopicName name of the Kafka Topic the payload is from.
* @param encryptedPayload the (potentially) encrypted payload.
* @return <code>true</code> when the topic is marked as encrypted and the payload is encrypted or
* when the topic is not encrypted as well as the payload.
*/
public boolean hasSameEncryptionFlag(String kafkaTopicName,
AesEncryptedPayload encryptedPayload) {
return encryptionKeyProvider.isEncryptedTopic(kafkaTopicName) == encryptedPayload.isEncrypted();
}


private Key createAesKey(TopicKeyVersion topicKeyVersion) {
String topic = topicKeyVersion.topic();
int keyVersionNumber = topicKeyVersion.keyVersionNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public interface EncryptionKeyProvider {
*/
String retrieveKeyForDecryption(String topic, int version, String encryptionKeyAttributeName);

/**
* @param kafkaTopicName the name of the topic
* @return <code>true</code> when the topic can contain encrypted payloads
*/
default boolean isEncryptedTopic(String kafkaTopicName) {
throw new UnsupportedOperationException(
"This method is not implemented by " + getClass().getName());
}

/**
* base64 and URL-Encoded encoded AES key
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ public String retrieveKeyForDecryption(String topic, int version,
}
}

@Override
public boolean isEncryptedTopic(String kafkaTopicName) {
return realEncryptionKeyProvider.isEncryptedTopic(kafkaTopicName);
}

private String retrieveNewExpiredAtTimestamp() {
return OffsetDateTime.now(clock)
.withOffsetSameInstant(ZoneOffset.UTC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public VaultEncryptionKeyProvider(

@Override
public KeyVersion retrieveKeyForEncryption(String kafkaTopicName) {
if (!config.isEncryptedTopic(kafkaTopicName)) {
if (!isEncryptedTopic(kafkaTopicName)) {
return null;
}

Expand Down Expand Up @@ -86,6 +86,11 @@ public String retrieveKeyForDecryption(String topic, int version,
return extractEncryptionKeyFromResponse(response, usedEncryptionKeyAttributeName);
}

@Override
public boolean isEncryptedTopic(String kafkaTopicName) {
return config.isEncryptedTopic(kafkaTopicName);
}

private void validateResponse(LogicalResponse response, Supplier<String> errorMsgSupplier) {
if (log.isTraceEnabled()) {
log.trace("status = {} / body = {}", response.getRestResponse().getStatus(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,38 @@ void shouldNotDecryptPayloadWhenItIsNotEncrypted() {
String result = decryptionService.decryptToString("someTopic", encryptedPayload);
assertThat(result).isEqualTo("Hello World!");
}

@Test
void testHasSameEncryptionFlag_true() {
EncryptionKeyProvider keyProvider = new DummyEncryptionKeyProvider(
"gZvWT1IN0mM5sK3sK0V2Wfzo9Jmk4tUPt7gxRsuN3LY=", "encryption_key", 3);
DecryptionService decryptionService = new DecryptionService(keyProvider);

int keyVersion = 3;
String encryptionKeyAttributeName = "encryption_key";
String ivBase64 = "2rW2tDnRdwRg87Ta";
byte[] encryptedPayloadByteArray = Base64.getDecoder()
.decode("6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");

AesEncryptedPayload encryptedPayload = AesEncryptedPayload.ofEncryptedPayload(
encryptedPayloadByteArray, ivBase64, keyVersion, encryptionKeyAttributeName);

assertThat(decryptionService.hasSameEncryptionFlag("sometopic", encryptedPayload))
.isTrue();
}

@Test
void testHasSameEncryptionFlag_false() {
EncryptionKeyProvider keyProvider = new DummyEncryptionKeyProvider(
"gZvWT1IN0mM5sK3sK0V2Wfzo9Jmk4tUPt7gxRsuN3LY=", "encryption_key", 3);
DecryptionService decryptionService = new DecryptionService(keyProvider);

byte[] plaintextByteArray = "Hello World!".getBytes(StandardCharsets.UTF_8);

AesEncryptedPayload unencryptedPayload = AesEncryptedPayload.ofUnencryptedPayload(
plaintextByteArray);

assertThat(decryptionService.hasSameEncryptionFlag("sometopic", unencryptedPayload))
.isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public String retrieveKeyForDecryption(String topic, int version,
public String retrieveKeyForDecryption(String topic, int version) {
return keyVersion.encodedKey();
}

@Override
public boolean isEncryptedTopic(String kafkaTopicName) {
return true;
}
}

0 comments on commit 39e05c2

Please sign in to comment.