Skip to content

Commit

Permalink
Broaden allowed types
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 24, 2023
1 parent d0d7b09 commit 2e08a10
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
10 changes: 6 additions & 4 deletions src/ClusterServerSocketFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

namespace Amp\Cluster;

use Amp\ByteStream\ReadableBuffer;
use Amp\ByteStream\ResourceStream;
use Amp\ByteStream\StreamChannel;
use Amp\ByteStream\WritableStream;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Socket\BindContext;
use Amp\Socket\ResourceServerSocket;
use Amp\Socket\ResourceSocket;
use Amp\Socket\ServerSocket;
use Amp\Socket\ServerSocketFactory;
use Amp\Socket\SocketAddress;
Expand All @@ -29,11 +31,11 @@ final class ClusterServerSocketFactory implements ServerSocketFactory
/** @var StreamResourceReceivePipe<null> */
private readonly StreamResourceReceivePipe $pipe;

public function __construct(ResourceSocket $socket)
public function __construct(WritableStream&ResourceStream $stream)
{
$serializer = new NativeSerializer();
$this->channel = new StreamChannel($socket, $socket, $serializer);
$this->pipe = new StreamResourceReceivePipe($socket, $serializer);
$this->channel = new StreamChannel(new ReadableBuffer(), $stream, $serializer);
$this->pipe = new StreamResourceReceivePipe($stream, $serializer);
}

public function __destruct()
Expand Down
10 changes: 6 additions & 4 deletions src/ClusterServerSocketProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

namespace Amp\Cluster;

use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\ResourceStream;
use Amp\ByteStream\StreamChannel;
use Amp\ByteStream\WritableBuffer;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\ForbidCloning;
Expand All @@ -11,7 +14,6 @@
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\Serializer;
use Amp\Socket\BindContext;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketAddress;
use Amp\Socket\SocketException;
use Amp\Sync\Channel;
Expand Down Expand Up @@ -44,12 +46,12 @@ public function __construct(BindContext $bindContext = new BindContext())
*
* @throws SocketException
*/
public function provideFor(ResourceSocket $socket, ?Cancellation $cancellation = null): Future
public function provideFor(ReadableStream&ResourceStream $stream, ?Cancellation $cancellation = null): Future
{
/** @var Channel<SocketAddress|null, never> $channel */
$channel = new StreamChannel($socket, $socket, $this->serializer);
$channel = new StreamChannel($stream, new WritableBuffer(), $this->serializer);
/** @var StreamResourceSendPipe<SocketAddress> $pipe */
$pipe = new StreamResourceSendPipe($socket, $this->serializer);
$pipe = new StreamResourceSendPipe($stream, $this->serializer);

$servers = &$this->servers;
$bindContext = $this->bindContext;
Expand Down

0 comments on commit 2e08a10

Please sign in to comment.