Skip to content

Commit

Permalink
Add internal SASL_PLAINTEXT Kafka listener (#1574)
Browse files Browse the repository at this point in the history
Co-authored-by: skarpenko <[email protected]>
Co-authored-by: Igor Yova <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2023
1 parent b81e579 commit 5d3f21c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
1 change: 1 addition & 0 deletions embedded-kafka/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ By default, to your projects `target` folder. You can configure binding using pr
* `embedded.kafka.toxiproxy.saslPlaintext.brokerList`
* `embedded.kafka.networkAlias`
* `embedded.kafka.internalPort`
* `embedded.kafka.internalSaslPlaintextPort`
* Bean `ToxiproxyContainer.ContainerProxy kafkaPlainTextContainerProxy`
* Bean `ToxiproxyContainer.ContainerProxy kafkaSaslContainerProxy`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public GenericContainer<?> kafka(
int kafkaInternalPort = kafkaProperties.getContainerBrokerPort(); // for access from other containers
int kafkaExternalPort = kafkaProperties.getBrokerPort(); // for access from host
int saslPlaintextKafkaExternalPort = kafkaProperties.getSaslPlaintextBrokerPort();
int saslPlaintextKafkaInternalPort = kafkaProperties.getInternalSaslPlaintextBrokerPort();
int toxiProxyKafkaInternalPort = kafkaProperties.getToxiProxyContainerBrokerPort();
int toxiProxySaslPlaintextKafkaInternalPort = kafkaProperties.getToxiProxySaslPlaintextContainerBrokerPort();

Expand All @@ -143,6 +144,7 @@ public String getBootstrapServers() {
List<String> servers = new ArrayList<>();
servers.add("EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(kafkaExternalPort));
servers.add("EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextKafkaExternalPort));
servers.add("INTERNAL_SASL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + saslPlaintextKafkaInternalPort);
servers.add("INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort);

if (plainTextProxy != null) {
Expand All @@ -162,16 +164,18 @@ public String getBootstrapServers() {
//see: https://github.com/wurstmeister/kafka-docker/blob/develop/README.md
// order matters: external then internal since kafka.client.ClientUtils.getPlaintextBrokerEndPoints take first for simple consumers
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"EXTERNAL_PLAINTEXT:PLAINTEXT," +
"EXTERNAL_PLAINTEXT:PLAINTEXT," +
"EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT," +
"INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT," +
"INTERNAL_PLAINTEXT:PLAINTEXT," +
"BROKER:PLAINTEXT," +
"TOXIPROXY_INTERNAL_PLAINTEXT:PLAINTEXT," +
"TOXIPROXY_INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT"
)
.withEnv("KAFKA_LISTENERS",
"EXTERNAL_PLAINTEXT://0.0.0.0:" + kafkaExternalPort + "," +
"EXTERNAL_PLAINTEXT://0.0.0.0:" + kafkaExternalPort + "," +
"EXTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextKafkaExternalPort + "," +
"INTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextKafkaInternalPort + "," +
"INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaInternalPort + "," +
"TOXIPROXY_INTERNAL_PLAINTEXT://0.0.0.0:" + toxiProxyKafkaInternalPort + "," +
"TOXIPROXY_INTERNAL_SASL_PLAINTEXT://0.0.0.0:" + toxiProxySaslPlaintextKafkaInternalPort + "," +
Expand Down Expand Up @@ -255,6 +259,7 @@ private void registerKafkaEnvironment(GenericContainer<?> kafka,
map.put("embedded.kafka.saslPlaintext.password", KafkaConfigurationProperties.KAFKA_PASSWORD);
map.put("embedded.kafka.networkAlias", KAFKA_HOST_NAME);
map.put("embedded.kafka.internalPort", kafkaProperties.getInternalBrokerPort());
map.put("embedded.kafka.internalSaslPlaintextPort", kafkaProperties.getInternalSaslPlaintextBrokerPort());

Integer containerPort = kafkaProperties.getContainerBrokerPort();
String kafkaBrokerListForContainers = format("%s:%d", KAFKA_HOST_NAME, containerPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class KafkaConfigurationProperties extends CommonContainerProperties {
protected int saslPlaintextBrokerPort = 9095;
protected int toxiProxyContainerBrokerPort = 9096;
protected int toxiProxySaslPlaintextContainerBrokerPort = 9097;
protected int internalSaslPlaintextBrokerPort = 9098;
protected int socketTimeoutMs = 5_000;
protected int bufferSize = 64 * 1024;

Expand Down

0 comments on commit 5d3f21c

Please sign in to comment.