Skip to content

Commit

Permalink
Move resource-based pipes to be internal
Browse files Browse the repository at this point in the history
Added ClientSocketReceivePipe and ClientSocketSendPipe, which use our stream and socket abstractions instead of resources.
  • Loading branch information
trowski committed Dec 27, 2023
1 parent 4744b77 commit df06ba0
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 13 deletions.
69 changes: 69 additions & 0 deletions src/ClientSocketReceivePipe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\Cancellation;
use Amp\Closable;
use Amp\Cluster\Internal\StreamResourceReceivePipe;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketException;

/**
* @template-covariant T
*/
final class ClientSocketReceivePipe implements Closable
{
use ForbidCloning;
use ForbidSerialization;

/** @var StreamResourceReceivePipe<T> */
private readonly StreamResourceReceivePipe $receive;

public function __construct(
ResourceStream $resourceStream,
Serializer $serializer = new NativeSerializer(),
) {
$this->receive = new StreamResourceReceivePipe($resourceStream, $serializer);
}

/**
* @param positive-int $chunkSize
*
* @return TransferredSocket<T>
*
* @throws SerializationException
* @throws SocketException
*/
public function receive(
?Cancellation $cancellation = null,
int $chunkSize = ResourceSocket::DEFAULT_CHUNK_SIZE,
): TransferredSocket {
$received = $this->receive->receive($cancellation);

return new TransferredSocket(
ResourceSocket::fromServerSocket($received->getResource(), $chunkSize),
$received->getData(),
);
}

public function close(): void
{
$this->receive->close();
}

public function isClosed(): bool
{
return $this->receive->isClosed();
}

public function onClose(\Closure $onClose): void
{
$this->receive->onClose($onClose);
}
}
63 changes: 63 additions & 0 deletions src/ClientSocketSendPipe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\Closable;
use Amp\Cluster\Internal\StreamResourceSendPipe;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\SocketException;

/**
* @template T
*/
final class ClientSocketSendPipe implements Closable
{
use ForbidCloning;
use ForbidSerialization;

/** @var StreamResourceSendPipe<T> */
private readonly StreamResourceSendPipe $send;

public function __construct(
ResourceStream $resourceStream,
Serializer $serializer = new NativeSerializer(),
) {
$this->send = new StreamResourceSendPipe($resourceStream, $serializer);
}

/**
* @param T $data
*
* @throws SerializationException
* @throws SocketException
*/
public function send(ResourceStream $resourceStream, mixed $data = null): void
{
$resource = $resourceStream->getResource();
if (!\is_resource($resource)) {
throw new SocketException('The provided socket has already been closed');
}

$this->send->send($resource, $data);
}

public function close(): void
{
$this->send->close();
}

public function isClosed(): bool
{
return $this->send->isClosed();
}

public function onClose(\Closure $onClose): void
{
$this->send->onClose($onClose);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;
namespace Amp\Cluster\Internal;

use Amp\ByteStream\PendingReadError;
use Amp\ByteStream\ResourceStream;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\Closable;
use Amp\Cluster\Internal;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\SocketException;
Expand All @@ -34,7 +34,7 @@ final class StreamResourceReceivePipe implements Closable

public function __construct(
ResourceStream $resourceStream,
private readonly Serializer $serializer = new NativeSerializer(),
private readonly Serializer $serializer,
) {
$this->transferSocket = $transferSocket = new Internal\TransferSocket($resourceStream);
$this->receiveQueue = $receiveQueue = new \SplQueue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;
namespace Amp\Cluster\Internal;

use Amp\ByteStream\ResourceStream;
use Amp\Closable;
use Amp\Cluster\Internal;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\SocketException;
Expand All @@ -30,7 +30,7 @@ final class StreamResourceSendPipe implements Closable

public function __construct(
ResourceStream $resourceStream,
private readonly Serializer $serializer = new NativeSerializer(),
private readonly Serializer $serializer,
) {
$this->transferSocket = $transferSocket = new Internal\TransferSocket($resourceStream);
$this->transferQueue = $transferQueue = new \SplQueue();
Expand Down
9 changes: 4 additions & 5 deletions src/Internal/TransferSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@

use Amp\ByteStream\ResourceStream;
use Amp\Closable;
use Amp\Cluster\TransferredResource;
use Amp\DeferredFuture;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Socket\SocketException;
use Socket as SocketResource;
use Socket as ExtSocketResource;

/** @internal */
final class TransferSocket implements Closable
{
use ForbidCloning;
use ForbidSerialization;

private readonly SocketResource $socket;
private readonly ExtSocketResource $socket;

private readonly \Closure $errorHandler;

Expand All @@ -35,7 +34,7 @@ public function __construct(ResourceStream $socket)
}

$socketResource = \socket_import_stream($streamResource);
if (!$socketResource instanceof SocketResource) {
if (!$socketResource instanceof ExtSocketResource) {
throw new SocketException('Unable to import transfer socket from stream socket resource');
}

Expand Down Expand Up @@ -112,7 +111,7 @@ public function receiveSocket(): ?TransferredResource
$transferredData = $data["iov"][0];
$transferredSocket = $data["control"][0]["data"][0];

\assert(\is_string($transferredData) && $transferredSocket instanceof SocketResource);
\assert(\is_string($transferredData) && $transferredSocket instanceof ExtSocketResource);

$transferredStream = \socket_export_stream($transferredSocket);
if (!$transferredStream) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;
namespace Amp\Cluster\Internal;

use Amp\ForbidCloning;
use Amp\ForbidSerialization;
Expand Down
3 changes: 2 additions & 1 deletion src/ServerSocketPipeFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Amp\ByteStream\ResourceStream;
use Amp\ByteStream\StreamChannel;
use Amp\ByteStream\WritableStream;
use Amp\Cluster\Internal\StreamResourceReceivePipe;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
Expand Down Expand Up @@ -34,7 +35,7 @@ final class ServerSocketPipeFactory implements ServerSocketFactory
public function __construct(WritableStream&ResourceStream $stream)
{
$serializer = new NativeSerializer();
$this->channel = new StreamChannel(new ReadableBuffer(), $stream, $serializer);
$this->channel = new StreamChannel(new ReadableBuffer(''), $stream, $serializer);
$this->pipe = new StreamResourceReceivePipe($stream, $serializer);
}

Expand Down
1 change: 1 addition & 0 deletions src/ServerSocketPipeProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\ByteStream\WritableBuffer;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\Cluster\Internal\StreamResourceSendPipe;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Future;
Expand Down
38 changes: 38 additions & 0 deletions src/TransferredSocket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php declare(strict_types=1);

namespace Amp\Cluster;

use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Socket\ResourceSocket;

/**
* @template-covariant T
*/
final class TransferredSocket
{
use ForbidCloning;
use ForbidSerialization;

/**
* @param T $data
*/
public function __construct(
private readonly ResourceSocket $socket,
private readonly mixed $data,
) {
}

public function getSocket(): ResourceSocket
{
return $this->socket;
}

/**
* @return T
*/
public function getData(): mixed
{
return $this->data;
}
}

0 comments on commit df06ba0

Please sign in to comment.