diff --git a/src/Statement/CreateConnection.php b/src/Statement/CreateConnection.php index 5f0bedb..3b64d4d 100644 --- a/src/Statement/CreateConnection.php +++ b/src/Statement/CreateConnection.php @@ -42,11 +42,14 @@ private function __construct(string $name, string $definition) public static function kafka(string $name, string ...$brokers): self { if (count($brokers) > 1) { - $definition = sprintf('KAFKA (BROKERS (%s))', implode(', ', array_map(static function (string $dsn) { - return sprintf('\'%s\'', $dsn); - }, $brokers))); + $definition = sprintf( + "KAFKA (BROKERS (%s), SECURITY PROTOCOL = 'PLAINTEXT')", + implode(', ', array_map(static function (string $dsn) { + return sprintf('\'%s\'', $dsn); + }, $brokers)) + ); } else { - $definition = sprintf('KAFKA (BROKER \'%s\')', current($brokers)); + $definition = sprintf("KAFKA (BROKER '%s', SECURITY PROTOCOL = 'PLAINTEXT')", current($brokers)); } return new self($name, $definition); diff --git a/src/Statement/CreateSink.php b/src/Statement/CreateSink.php index b069e46..ccc55a4 100644 --- a/src/Statement/CreateSink.php +++ b/src/Statement/CreateSink.php @@ -109,8 +109,8 @@ public function __toString(): string $query .= ' IF NOT EXISTS'; } - $query .= ' %s FROM %s INTO %s WITH (SIZE = \'%s\')'; + $query .= ' %s FROM %s INTO %s'; - return sprintf($query, $this->name, $this->from, $this->definition, $this->size); + return sprintf($query, $this->name, $this->from, $this->definition); } } diff --git a/src/Statement/CreateSource.php b/src/Statement/CreateSource.php index e2bae28..9b14656 100644 --- a/src/Statement/CreateSource.php +++ b/src/Statement/CreateSource.php @@ -19,11 +19,6 @@ class CreateSource implements Command */ private $definition; - /** - * @var string - */ - private $size = '1'; - /** * @var bool */ @@ -71,18 +66,6 @@ public static function postgres(string $name, string $connection, string $public return new self($name, $definition); } - /** - * @param string $size - * - * @return self - */ - public function size(string $size): self - { - $this->size = $size; - - return $this; - } - /** * @return self */ @@ -106,8 +89,8 @@ public function __toString(): string $query .= ' IF NOT EXISTS'; } - $query .= ' %s FROM %s WITH (SIZE = \'%s\')'; + $query .= ' %s FROM %s'; - return sprintf($query, $this->name, $this->definition, $this->size); + return sprintf($query, $this->name, $this->definition); } }