Skip to content

Commit

Permalink
Removed deprecated size and change connection security protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
akarashchuk committed Apr 20, 2024
1 parent b60a7d4 commit d9ffc32
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 25 deletions.
11 changes: 7 additions & 4 deletions src/Statement/CreateConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ private function __construct(string $name, string $definition)
public static function kafka(string $name, string ...$brokers): self
{
if (count($brokers) > 1) {
$definition = sprintf('KAFKA (BROKERS (%s))', implode(', ', array_map(static function (string $dsn) {
return sprintf('\'%s\'', $dsn);
}, $brokers)));
$definition = sprintf(
'KAFKA (BROKERS (%s) SECURITY PROTOCOL = \'PLAINTEXT\')',
implode(', ', array_map(static function (string $dsn) {
return sprintf('\'%s\'', $dsn);
}, $brokers))
);
} else {
$definition = sprintf('KAFKA (BROKER \'%s\')', current($brokers));
$definition = sprintf('KAFKA (BROKER \'%s\' SECURITY PROTOCOL = \'PLAINTEXT\')', current($brokers));
}

return new self($name, $definition);
Expand Down
4 changes: 2 additions & 2 deletions src/Statement/CreateSink.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public function __toString(): string
$query .= ' IF NOT EXISTS';
}

$query .= ' %s FROM %s INTO %s WITH (SIZE = \'%s\')';
$query .= ' %s FROM %s INTO %s';

return sprintf($query, $this->name, $this->from, $this->definition, $this->size);
return sprintf($query, $this->name, $this->from, $this->definition);
}
}
21 changes: 2 additions & 19 deletions src/Statement/CreateSource.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ class CreateSource implements Command
*/
private $definition;

/**
* @var string
*/
private $size = '1';

/**
* @var bool
*/
Expand Down Expand Up @@ -71,18 +66,6 @@ public static function postgres(string $name, string $connection, string $public
return new self($name, $definition);
}

/**
* @param string $size
*
* @return self
*/
public function size(string $size): self
{
$this->size = $size;

return $this;
}

/**
* @return self
*/
Expand All @@ -106,8 +89,8 @@ public function __toString(): string
$query .= ' IF NOT EXISTS';
}

$query .= ' %s FROM %s WITH (SIZE = \'%s\')';
$query .= ' %s FROM %s';

return sprintf($query, $this->name, $this->definition, $this->size);
return sprintf($query, $this->name, $this->definition);
}
}

0 comments on commit d9ffc32

Please sign in to comment.