From d8e0c7f8968e6efd71f9d68c21cb33015c90e313 Mon Sep 17 00:00:00 2001 From: Alexis Incogito Date: Sat, 23 Apr 2022 23:17:57 +0200 Subject: [PATCH 1/5] - Added: Non-generic versions of PipeServer, PipeClient, SingleConnectionPipeServer, SingleConnectionPipeClient - Added: Tests for the new PipeXXX classes in the form of [DataRow()], see DataTests.cs - Added: FormatterExtensions.cs which implements extensions methods SerializeAsync and DeserializeAsync on objects - Added: New IPipeConnection interface for PipeConnection - Updated: Generic versions of PipeXXX inherit from their non-generic counterpart - Updated: Renamed IPipeConnection to IPipe since all PipeXXX inherit from them (the name seemed more fitting) - Updated: BaseTests.cs to test both the Generic and Non-generic PipeXXX versions - Updated: Made PipeXXX non-sealed - Updated: PipeXXX private methods and properties are now protected and documented --- .../PipeClientExtensions.cs | 30 +-- .../PipeConnectionExtensions.cs | 5 +- .../PipeServerExtensions.cs | 28 ++- src/libs/H.Pipes/Args/ConnectionEventArgs.cs | 7 +- .../Args/ConnectionExceptionEventArgs.cs | 5 +- .../Args/ConnectionMessageEventArgs.cs | 10 +- .../Extensions/ConnectionExtensions.cs | 10 +- .../H.Pipes/Extensions/FormatterExtensions.cs | 60 +++++ src/libs/H.Pipes/IPipe.cs | 53 ++++ src/libs/H.Pipes/IPipeClient.cs | 40 ++- src/libs/H.Pipes/IPipeConnection.cs | 75 ++++-- src/libs/H.Pipes/IPipeServer.cs | 70 +++++- src/libs/H.Pipes/PipeClient.cs | 191 ++++++++++++--- src/libs/H.Pipes/PipeConnection.cs | 178 ++++++++------ src/libs/H.Pipes/PipeServer.cs | 175 ++++++++++--- .../H.Pipes/SingleConnectionPipeClient.cs | 221 +++++++++++++---- .../H.Pipes/SingleConnectionPipeServer.cs | 171 +++++++++++-- src/tests/H.Pipes.Tests/BaseTests.cs | 229 +++++++++++++++--- src/tests/H.Pipes.Tests/DataTests.cs | 138 +++++++---- src/tests/H.Pipes.Tests/PipeClientTests.cs | 8 +- 20 files changed, 1334 insertions(+), 370 deletions(-) create mode 100644 src/libs/H.Pipes/Extensions/FormatterExtensions.cs create mode 100644 src/libs/H.Pipes/IPipe.cs diff --git a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs index f8739d1..2c9682c 100644 --- a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs @@ -27,33 +27,35 @@ public static void EnableEncryption( Action? exceptionAction = null) { client = client ?? throw new ArgumentNullException(nameof(client)); - client.Connected += async (o, args) => + client.Connected += async (_, connArgs) => { try { - var pipeName = $"{args.Connection.PipeName}_Inferno"; + var pipeName = $"{connArgs.Connection.PipeName}_Inferno"; using var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = source.Token; - var client = new SingleConnectionPipeClient(pipeName, args.Connection.ServerName, formatter: args.Connection.Formatter); - client.ExceptionOccurred += (_, args) => + var infClient = new SingleConnectionPipeClient(pipeName, connArgs.Connection.ServerName, formatter: connArgs.Connection.Formatter); + + infClient.ExceptionOccurred += (_, exArgs) => { - Debug.WriteLine($"{nameof(EnableEncryption)} client returns exception: {args.Exception}"); + Debug.WriteLine($"{nameof(EnableEncryption)} client returns exception: {exArgs.Exception}"); - exceptionAction?.Invoke(args.Exception); + exceptionAction?.Invoke(exArgs.Exception); }; - await using (client.ConfigureAwait(false)) + + await using (infClient.ConfigureAwait(false)) { - using var _keyPair = new KeyPair(); - await client.WriteAsync(_keyPair.PublicKey, cancellationToken).ConfigureAwait(false); + using var keyPair = new KeyPair(); + await infClient.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); - var response = await client.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + var response = await infClient.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); var serverPublicKey = response.Message; - args.Connection.Formatter = new InfernoFormatter( - args.Connection.Formatter, - _keyPair.GenerateSharedKey(serverPublicKey)); + connArgs.Connection.Formatter = new InfernoFormatter( + connArgs.Connection.Formatter, + keyPair.GenerateSharedKey(serverPublicKey)); } } catch (Exception exception) @@ -65,7 +67,7 @@ public static void EnableEncryption( exceptionAction?.Invoke(exception); } }; - client.Disconnected += (o, args) => + client.Disconnected += (_, args) => { if (args.Connection.Formatter is not InfernoFormatter infernoFormatter) { diff --git a/src/libs/H.Formatters.Inferno/PipeConnectionExtensions.cs b/src/libs/H.Formatters.Inferno/PipeConnectionExtensions.cs index 74ccc7e..ed5e491 100644 --- a/src/libs/H.Formatters.Inferno/PipeConnectionExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeConnectionExtensions.cs @@ -10,12 +10,11 @@ public static class PipeConnectionExtensions /// /// Waits key exchange. /// - /// /// /// /// - public static async Task WaitExchangeAsync( - this PipeConnection connection, + public static async Task WaitExchangeAsync( + this IPipeConnection connection, CancellationToken cancellationToken = default) { connection = connection ?? throw new ArgumentNullException(nameof(connection)); diff --git a/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs b/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs index 6dd60d0..6893ae6 100644 --- a/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs @@ -27,42 +27,44 @@ public static void EnableEncryption( Action? exceptionAction = null) { server = server ?? throw new ArgumentNullException(nameof(server)); - server.ClientConnected += async (_, args) => + server.ClientConnected += async (_, connArgs) => { try { using var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = source.Token; - var pipeName = $"{args.Connection.PipeName}_Inferno"; - var server = new SingleConnectionPipeServer(pipeName, args.Connection.Formatter); - server.ExceptionOccurred += (_, args) => + var pipeName = $"{connArgs.Connection.PipeName}_Inferno"; + var infServer = new SingleConnectionPipeServer(pipeName, connArgs.Connection.Formatter); + + infServer.ExceptionOccurred += (_, exArgs) => { - Debug.WriteLine($"{nameof(EnableEncryption)} server returns exception: {args.Exception}"); + Debug.WriteLine($"{nameof(EnableEncryption)} server returns exception: {exArgs.Exception}"); - exceptionAction?.Invoke(args.Exception); + exceptionAction?.Invoke(exArgs.Exception); }; - await using (server.ConfigureAwait(false)) + + await using (infServer.ConfigureAwait(false)) { - await server.StartAsync(cancellationToken).ConfigureAwait(false); + await infServer.StartAsync(cancellationToken).ConfigureAwait(false); - var response = await server.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + var response = await infServer.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); var clientPublicKey = response.Message; using var keyPair = new KeyPair(); - args.Connection.Formatter = new InfernoFormatter( - args.Connection.Formatter, + connArgs.Connection.Formatter = new InfernoFormatter( + connArgs.Connection.Formatter, keyPair.GenerateSharedKey(clientPublicKey)); - await server.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); + await infServer.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); } } catch (Exception exception) { Debug.WriteLine($"{nameof(EnableEncryption)} returns exception: {exception}"); - await args.Connection.StopAsync().ConfigureAwait(false); + await connArgs.Connection.StopAsync().ConfigureAwait(false); exceptionAction?.Invoke(exception); } diff --git a/src/libs/H.Pipes/Args/ConnectionEventArgs.cs b/src/libs/H.Pipes/Args/ConnectionEventArgs.cs index 57053d9..99cd2a6 100644 --- a/src/libs/H.Pipes/Args/ConnectionEventArgs.cs +++ b/src/libs/H.Pipes/Args/ConnectionEventArgs.cs @@ -3,19 +3,18 @@ /// /// Handles new connections. /// -/// Reference type -public class ConnectionEventArgs : EventArgs +public class ConnectionEventArgs : EventArgs { /// /// Connection /// - public PipeConnection Connection { get; } + public PipeConnection Connection { get; } /// /// /// /// - public ConnectionEventArgs(PipeConnection connection) + public ConnectionEventArgs(PipeConnection connection) { Connection = connection ?? throw new ArgumentNullException(nameof(connection)); } diff --git a/src/libs/H.Pipes/Args/ConnectionExceptionEventArgs.cs b/src/libs/H.Pipes/Args/ConnectionExceptionEventArgs.cs index 9ffad40..d975b82 100644 --- a/src/libs/H.Pipes/Args/ConnectionExceptionEventArgs.cs +++ b/src/libs/H.Pipes/Args/ConnectionExceptionEventArgs.cs @@ -3,8 +3,7 @@ /// /// Handles exceptions thrown during read/write operations. /// -/// Reference type -public class ConnectionExceptionEventArgs : ConnectionEventArgs +public class ConnectionExceptionEventArgs : ConnectionEventArgs { /// /// The exception that was thrown @@ -16,7 +15,7 @@ public class ConnectionExceptionEventArgs : ConnectionEventArgs /// /// /// - public ConnectionExceptionEventArgs(PipeConnection connection, Exception exception) : base(connection) + public ConnectionExceptionEventArgs(PipeConnection connection, Exception exception) : base(connection) { Exception = exception ?? throw new ArgumentNullException(nameof(exception)); } diff --git a/src/libs/H.Pipes/Args/ConnectionMessageEventArgs.cs b/src/libs/H.Pipes/Args/ConnectionMessageEventArgs.cs index c93c72e..5f763a5 100644 --- a/src/libs/H.Pipes/Args/ConnectionMessageEventArgs.cs +++ b/src/libs/H.Pipes/Args/ConnectionMessageEventArgs.cs @@ -1,10 +1,14 @@ -namespace H.Pipes.Args; +using H.Formatters; +using H.Pipes.Extensions; + +namespace H.Pipes.Args; + /// /// Handles messages received from a named pipe. /// /// Reference type -public class ConnectionMessageEventArgs : ConnectionEventArgs +public class ConnectionMessageEventArgs : ConnectionEventArgs { /// /// Message sent by the other end of the pipe @@ -16,7 +20,7 @@ public class ConnectionMessageEventArgs : ConnectionEventArgs /// /// /// - public ConnectionMessageEventArgs(PipeConnection connection, T message) : base(connection) + public ConnectionMessageEventArgs(PipeConnection connection, T message) : base(connection) { Message = message; } diff --git a/src/libs/H.Pipes/Extensions/ConnectionExtensions.cs b/src/libs/H.Pipes/Extensions/ConnectionExtensions.cs index 5e91011..47f7b75 100644 --- a/src/libs/H.Pipes/Extensions/ConnectionExtensions.cs +++ b/src/libs/H.Pipes/Extensions/ConnectionExtensions.cs @@ -17,11 +17,11 @@ public static class ConnectionExtensions /// /// /// - public static async Task> WaitMessageAsync(this IPipeConnection connection, Func? func = null, CancellationToken cancellationToken = default) + public static async Task> WaitMessageAsync(this IPipe connection, Func? func = null, CancellationToken cancellationToken = default) { return await connection.WaitEventAsync>( - func ?? (token => Task.Delay(TimeSpan.Zero, cancellationToken)), - nameof(connection.MessageReceived), + func ?? (_ => Task.Delay(TimeSpan.Zero, cancellationToken)), + nameof(IPipe.MessageReceived), cancellationToken).ConfigureAwait(false); } @@ -35,10 +35,10 @@ public static async Task> WaitMessageAsync(this /// /// /// - public static async Task> WaitMessageAsync(this IPipeConnection connection, TimeSpan timeout, Func? func = null) + public static async Task> WaitMessageAsync(this IPipe connection, TimeSpan timeout, Func? func = null) { using var tokenSource = new CancellationTokenSource(timeout); - return await connection.WaitMessageAsync(func, tokenSource.Token).ConfigureAwait(false); + return await connection.WaitMessageAsync(func, tokenSource.Token).ConfigureAwait(false); } } diff --git a/src/libs/H.Pipes/Extensions/FormatterExtensions.cs b/src/libs/H.Pipes/Extensions/FormatterExtensions.cs new file mode 100644 index 0000000..9c8ae61 --- /dev/null +++ b/src/libs/H.Pipes/Extensions/FormatterExtensions.cs @@ -0,0 +1,60 @@ +using H.Formatters; + +namespace H.Pipes.Extensions; + +/// +/// Class FormatterExtensions. +/// +public static class FormatterExtensions +{ + #region Methods + + /// + /// Uses the to serialize the given object into a byte + /// array. + /// + /// Object type + /// The object instance. + /// The formatter + /// + /// The cancellation token that can be used by other objects or + /// threads to receive notice of cancellation. + /// + /// Serialized object. + public static async Task SerializeAsync( + this T value, + IFormatter formatter, + CancellationToken cancellationToken) + { + if (formatter == null) + throw new ArgumentNullException(nameof(formatter)); + + return formatter is IAsyncFormatter asyncFormatter + ? await asyncFormatter.SerializeAsync(value, cancellationToken).ConfigureAwait(false) + : formatter.Serialize(value); + } + + /// Deserializes the bytes into the specified type using . + /// + /// The bytes. + /// The formatter. + /// + /// The cancellation token that can be used by other objects or + /// threads to receive notice of cancellation. + /// + /// System.Nullable<T>. + public static async Task DeserializeAsync( + this byte[] bytes, + IFormatter formatter, + CancellationToken cancellationToken) + { + if (formatter == null) + throw new ArgumentNullException(nameof(formatter)); + + return formatter is IAsyncFormatter asyncFormatter + ? await asyncFormatter.DeserializeAsync(bytes, cancellationToken).ConfigureAwait(false) + : formatter.Deserialize(bytes); + } + + #endregion +} diff --git a/src/libs/H.Pipes/IPipe.cs b/src/libs/H.Pipes/IPipe.cs new file mode 100644 index 0000000..96d654a --- /dev/null +++ b/src/libs/H.Pipes/IPipe.cs @@ -0,0 +1,53 @@ +using H.Formatters; +using H.Pipes.Args; + +namespace H.Pipes; + +/// +/// Base class of all connections +/// +public interface IPipe : IAsyncDisposable +{ + #region Properties + + /// + /// Used formatter + /// + public IFormatter Formatter { get; } + + #endregion + + #region Events + + /// + /// Invoked whenever a message is received. + /// + event EventHandler>? MessageReceived; + + /// + /// Invoked whenever an exception is thrown during a read or write operation on the named pipe. + /// + event EventHandler? ExceptionOccurred; + + #endregion + + #region Methods + + /// + /// Sends a message over a named pipe.
+ ///
+ /// Message to send + /// + /// + Task WriteAsync(byte[] value, CancellationToken cancellationToken = default); + + /// + /// Sends a message to all connected clients asynchronously. + /// This method returns immediately, possibly before the message has been sent to all clients. + /// + /// + /// + Task WriteAsync(T value, CancellationToken cancellationToken = default); + + #endregion +} diff --git a/src/libs/H.Pipes/IPipeClient.cs b/src/libs/H.Pipes/IPipeClient.cs index fdd8bc7..ba75739 100644 --- a/src/libs/H.Pipes/IPipeClient.cs +++ b/src/libs/H.Pipes/IPipeClient.cs @@ -4,10 +4,40 @@ namespace H.Pipes; /// -/// Wraps a . +/// Specialized version of for communications based +/// on a single type /// /// Reference type to read/write from the named pipe -public interface IPipeClient : IPipeConnection +/// +public interface IPipeClient : IPipeClient +{ + + #region Events + + /// + /// Invoked whenever a message is received. + /// + new event EventHandler>? MessageReceived; + + #endregion + + #region Methods + + /// + /// Sends a message over a named pipe.
+ ///
+ /// Message to send + /// + /// + Task WriteAsync(T value, CancellationToken cancellationToken = default); + + #endregion +} + +/// +/// Wraps a . +/// +public interface IPipeClient : IPipe { #region Properties @@ -46,7 +76,7 @@ public interface IPipeClient : IPipeConnection /// /// Active connection. /// - public PipeConnection? Connection { get; } + public PipeConnection? Connection { get; } #endregion @@ -55,12 +85,12 @@ public interface IPipeClient : IPipeConnection /// /// Invoked after each the client connect to the server (include reconnects). /// - event EventHandler>? Connected; + event EventHandler? Connected; /// /// Invoked when the client disconnects from the server (e.g., the pipe is closed or broken). /// - event EventHandler>? Disconnected; + event EventHandler? Disconnected; #endregion diff --git a/src/libs/H.Pipes/IPipeConnection.cs b/src/libs/H.Pipes/IPipeConnection.cs index d1491ae..d417b9c 100644 --- a/src/libs/H.Pipes/IPipeConnection.cs +++ b/src/libs/H.Pipes/IPipeConnection.cs @@ -1,46 +1,89 @@ -using H.Formatters; +using System.IO.Pipes; +using H.Formatters; using H.Pipes.Args; namespace H.Pipes; /// -/// Base class of all connections +/// Represents a connection between a named pipe client and server. /// -/// Reference type to read/write from the named pipe -public interface IPipeConnection : IAsyncDisposable +public interface IPipeConnection { #region Properties + /// Used formatter + IFormatter Formatter { get; set; } + + /// Gets a value indicating whether the pipe is connected or not. + bool IsConnected { get; } + + /// if started and not disposed. + bool IsStarted { get; } + + /// Gets the connection's pipe name. + string PipeName { get; } + /// - /// Used formatter + /// Raw pipe stream. You can cast it to or + /// . /// - public IFormatter Formatter { get; } + PipeStream PipeStream { get; } + + /// Gets the connection's server name. Only for client connections. + string ServerName { get; } #endregion #region Events - /// - /// Invoked whenever a message is received. - /// - event EventHandler>? MessageReceived; + /// Invoked when the named pipe connection terminates. + event EventHandler? Disconnected; + + /// Invoked whenever a message is received from the other end of the pipe. + event EventHandler? ExceptionOccurred; /// - /// Invoked whenever an exception is thrown during a read or write operation on the named pipe. + /// Invoked when an exception is thrown during any read/write operation over the named + /// pipe. /// - event EventHandler? ExceptionOccurred; + event EventHandler>? MessageReceived; #endregion #region Methods /// - /// Sends a message over a named pipe.
+ /// Begins reading from and writing to the named pipe on a background thread. This method + /// returns immediately. ///
- /// Message to send + void Start(); + + /// Dispose internal resources + Task StopAsync(); + + /// Writes the specified and waits other end reading + /// /// - /// - Task WriteAsync(T value, CancellationToken cancellationToken = default); + Task WriteAsync(byte[] value, CancellationToken cancellationToken = default); + + /// Writes the specified and waits other end reading + /// + /// + Task WriteAsync(T value, CancellationToken cancellationToken = default); + + /// Gets the user name of the client on the other end of the pipe. + /// The user name of the client on the other end of the pipe. + /// + /// is not + /// . + /// + /// No pipe connections have been made yet. + /// The connected pipe has already disconnected. + /// The pipe handle has not been set. + /// The pipe is closed. + /// The pipe connection has been broken. + /// The user name of the client is longer than 19 characters. + string GetImpersonationUserName(); #endregion } diff --git a/src/libs/H.Pipes/IPipeServer.cs b/src/libs/H.Pipes/IPipeServer.cs index 37bc07c..6516263 100644 --- a/src/libs/H.Pipes/IPipeServer.cs +++ b/src/libs/H.Pipes/IPipeServer.cs @@ -4,10 +4,40 @@ namespace H.Pipes; /// -/// +/// Specialized version of for communications based +/// on a single type /// /// Reference type to read/write from the named pipe -public interface IPipeServer : IPipeConnection +/// +public interface IPipeServer : IPipeServer +{ + + #region Events + + /// + /// Invoked whenever a message is received. + /// + new event EventHandler>? MessageReceived; + + #endregion + + #region Methods + + /// + /// Sends a message over a named pipe.
+ ///
+ /// Message to send + /// + /// + Task WriteAsync(T value, CancellationToken cancellationToken = default); + + #endregion +} + +/// +/// +/// +public interface IPipeServer : IPipe { #region Properties @@ -38,12 +68,12 @@ public interface IPipeServer : IPipeConnection /// /// Invoked whenever a client connects to the server. /// - event EventHandler>? ClientConnected; + event EventHandler? ClientConnected; /// /// Invoked whenever a client disconnects from the server. /// - event EventHandler>? ClientDisconnected; + event EventHandler? ClientDisconnected; #endregion @@ -62,5 +92,37 @@ public interface IPipeServer : IPipeConnection /// Task StopAsync(CancellationToken _ = default); + /// + /// Sends a message to all connected clients asynchronously. + /// + /// + /// + /// + Task WriteAsync(byte[] value, Predicate? predicate, CancellationToken cancellationToken = default); + + /// + /// Sends a message to the given client by pipe name. + /// + /// + /// + /// + Task WriteAsync(byte[] value, string pipeName, CancellationToken cancellationToken = default); + + /// + /// Sends a message to all connected clients asynchronously. + /// + /// + /// + /// + Task WriteAsync(T value, Predicate? predicate, CancellationToken cancellationToken = default); + + /// + /// Sends a message to the given client by pipe name. + /// + /// + /// + /// + Task WriteAsync(T value, string pipeName, CancellationToken cancellationToken = default); + #endregion } diff --git a/src/libs/H.Pipes/PipeClient.cs b/src/libs/H.Pipes/PipeClient.cs index 2af7d1d..a76126f 100644 --- a/src/libs/H.Pipes/PipeClient.cs +++ b/src/libs/H.Pipes/PipeClient.cs @@ -8,9 +8,73 @@ namespace H.Pipes; /// /// Wraps a . +/// Specialized version of for communications based on a single type. +/// Implements the +/// Implements the /// /// Reference type to read/write from the named pipe -public sealed class PipeClient : IPipeClient +/// +/// +public class PipeClient : PipeClient, IPipeClient +{ + #region Constructors + + /// + public PipeClient(string pipeName, string serverName = ".", TimeSpan? reconnectionInterval = default, IFormatter? formatter = default) : base(pipeName, serverName, reconnectionInterval, formatter) { } + + #endregion + + #region Events + + /// + public new event EventHandler>? MessageReceived; + + /// + /// Calls the event. + /// + /// The arguments. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } + + #endregion + + #region Public methods + + /// + public Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + return base.WriteAsync(value, cancellationToken); + } + + /// + protected override PipeConnection SetupPipeConnection( + PipeStream dataPipe, string connectionPipeName, IFormatter formatter, string serverName) + { + var connection = new PipeConnection(dataPipe, connectionPipeName, formatter, serverName); + + connection.Disconnected += async (_, args) => + { + await DisconnectInternalAsync().ConfigureAwait(false); + + OnDisconnected(args); + }; + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + + #endregion +} + +/// +/// Wraps a . +/// Implements the +/// +/// +public class PipeClient : IPipeClient { #region Fields @@ -33,7 +97,7 @@ public sealed class PipeClient : IPipeClient public bool IsConnecting { get => _isConnecting; - private set => _isConnecting = value; + protected set => _isConnecting = value; } /// @@ -46,9 +110,13 @@ public bool IsConnecting public string ServerName { get; } /// - public PipeConnection? Connection { get; private set; } + public PipeConnection? Connection { get; protected set; } - private System.Timers.Timer ReconnectionTimer { get; } + /// + /// Gets the reconnection timer. + /// + /// The reconnection timer. + protected System.Timers.Timer ReconnectionTimer { get; } #endregion @@ -57,39 +125,55 @@ public bool IsConnecting /// /// Invoked whenever a message is received from the server. /// - public event EventHandler>? MessageReceived; + public event EventHandler>? MessageReceived; /// /// Invoked when the client disconnects from the server (e.g., the pipe is closed or broken). /// - public event EventHandler>? Disconnected; + public event EventHandler? Disconnected; /// /// Invoked after each the client connect to the server (include reconnects). /// - public event EventHandler>? Connected; + public event EventHandler? Connected; /// /// Invoked whenever an exception is thrown during a read or write operation on the named pipe. /// public event EventHandler? ExceptionOccurred; - private void OnMessageReceived(ConnectionMessageEventArgs args) - { - MessageReceived?.Invoke(this, args); - } - - private void OnDisconnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnDisconnected(ConnectionEventArgs args) { Disconnected?.Invoke(this, args); } - private void OnConnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnConnected(ConnectionEventArgs args) { Connected?.Invoke(this, args); } + + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } - private void OnExceptionOccurred(Exception exception) + /// + /// Calls the event. + /// + /// The exception. + protected void OnExceptionOccurred(Exception exception) { ExceptionOccurred?.Invoke(this, new ExceptionEventArgs(exception)); } @@ -179,18 +263,10 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default) #pragma warning restore CA2000 // Dispose objects before losing scope .ConfigureAwait(false); - Connection = new PipeConnection(dataPipe, connectionPipeName, Formatter, ServerName); - Connection.Disconnected += async (_, args) => - { - await DisconnectInternalAsync().ConfigureAwait(false); - - OnDisconnected(args); - }; - Connection.MessageReceived += (_, args) => OnMessageReceived(args); - Connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + Connection = SetupPipeConnection(dataPipe, connectionPipeName, Formatter, ServerName); Connection.Start(); - OnConnected(new ConnectionEventArgs(Connection)); + OnConnected(new ConnectionEventArgs(Connection)); } catch (Exception) { @@ -216,7 +292,11 @@ public async Task DisconnectAsync(CancellationToken _ = default) await DisconnectInternalAsync().ConfigureAwait(false); } - private async Task DisconnectInternalAsync() + /// + /// Disconnects from the server. Does not stop . + /// + /// A Task representing the asynchronous operation. + protected async Task DisconnectInternalAsync() { if (Connection == null) { @@ -228,6 +308,31 @@ private async Task DisconnectInternalAsync() Connection = null; } + /// + /// Instantiates and sets up the pipe connection (event handlers, etc.). + /// + /// The pipe stream. + /// Name of the connection pipe. + /// The formatter. + /// + /// PipeConnection. + protected virtual PipeConnection SetupPipeConnection( + PipeStream dataPipe, string connectionPipeName, IFormatter formatter, string serverName) + { + var connection = new PipeConnection(dataPipe, connectionPipeName, formatter, serverName); + + connection.Disconnected += async (_, args) => + { + await DisconnectInternalAsync().ConfigureAwait(false); + + OnDisconnected(args); + }; + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + /// /// Sends a message to the server over a named pipe.
/// If client is not connected, is occurred @@ -235,18 +340,43 @@ private async Task DisconnectInternalAsync() /// Message to send to the server. /// /// - public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + public async Task WriteAsync(byte[] value, CancellationToken cancellationToken = default) + { + await ReconnectOrThrow(cancellationToken).ConfigureAwait(false); + + await Connection!.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + /// Sends a message to the server over a named pipe.
+ /// If client is not connected, is occurred + ///
+ /// Message to send to the server. + /// + /// + public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + await ReconnectOrThrow(cancellationToken).ConfigureAwait(false); + + await Connection!.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + /// Reconnects the client if needed and throws an exception when failed. + /// + /// The cancellation token that can be used by other objects or threads to receive notice of cancellation. + /// Client is not connected + protected async Task ReconnectOrThrow(CancellationToken cancellationToken = default) { if (!IsConnected && AutoReconnect) { await ConnectAsync(cancellationToken).ConfigureAwait(false); } + if (Connection == null) { throw new InvalidOperationException("Client is not connected"); } - - await Connection.WriteAsync(value, cancellationToken).ConfigureAwait(false); } #endregion @@ -261,6 +391,8 @@ public async ValueTask DisposeAsync() ReconnectionTimer.Dispose(); await DisconnectInternalAsync().ConfigureAwait(false); + + GC.SuppressFinalize(this); } #endregion @@ -286,6 +418,7 @@ private async Task GetConnectionPipeName(CancellationToken cancellationT #endif { var bytes = await handshake.ReadAsync(cancellationToken).ConfigureAwait(false); + if (bytes == null) { throw new InvalidOperationException("Connection failed: Returned by server pipeName is null"); diff --git a/src/libs/H.Pipes/PipeConnection.cs b/src/libs/H.Pipes/PipeConnection.cs index 3aa4d96..b0b4d4a 100644 --- a/src/libs/H.Pipes/PipeConnection.cs +++ b/src/libs/H.Pipes/PipeConnection.cs @@ -1,47 +1,73 @@ using System.IO.Pipes; using H.Formatters; using H.Pipes.Args; +using H.Pipes.Extensions; using H.Pipes.IO; using H.Pipes.Utilities; namespace H.Pipes; -/// -/// Represents a connection between a named pipe client and server. -/// +/// /// Reference type to read/write from the named pipe -public sealed class PipeConnection : IAsyncDisposable +public class PipeConnection : PipeConnection { - #region Properties + #region Constructors - /// - /// Gets the connection's pipe name. - /// - public string PipeName { get; } + /// + internal PipeConnection(PipeStream stream, string pipeName, IFormatter formatter, string serverName = "") + : base(stream, pipeName, formatter, serverName) { } - /// - /// Gets the connection's server name. Only for client connections. - /// - public string ServerName { get; } + #endregion + #region Events + /// - /// Gets a value indicating whether the pipe is connected or not. + /// Invoked when an exception is thrown during any read/write operation over the named + /// pipe. /// - public bool IsConnected => PipeStreamWrapper.IsConnected; + public new event EventHandler>? MessageReceived; - /// - /// if started and not disposed. - /// - public bool IsStarted => ReadWorker != null; + /// + protected override async Task OnMessageReceived(byte[]? message, CancellationToken cancellationToken) + { + T? obj = default; - /// - /// Raw pipe stream. You can cast it to or . - /// + if (message != null) + obj = await message.DeserializeAsync(Formatter, cancellationToken).ConfigureAwait(false); + + MessageReceived?.Invoke(this, new ConnectionMessageEventArgs(this, obj)); + } + + #endregion +} + +/// +/// Represents a connection between a named pipe client and server. Implements the +/// Implements the +/// +/// +/// +/// +public class PipeConnection : IPipeConnection, IAsyncDisposable +{ + #region Properties + + /// + public string PipeName { get; } + + /// + public string ServerName { get; } + + /// + public bool IsConnected => PipeStreamWrapper.IsConnected; + + /// + public bool IsStarted => ReadWorker != null; + + /// public PipeStream PipeStream { get; } - /// - /// Used formatter. - /// + /// public IFormatter Formatter { get; set; } private PipeStreamWrapper PipeStreamWrapper { get; } @@ -50,35 +76,43 @@ public sealed class PipeConnection : IAsyncDisposable #endregion #region Events + + /// + public event EventHandler? Disconnected; + + /// + public event EventHandler>? MessageReceived; + + /// + public event EventHandler? ExceptionOccurred; /// - /// Invoked when the named pipe connection terminates. - /// - public event EventHandler>? Disconnected; - - /// - /// Invoked whenever a message is received from the other end of the pipe. + /// Calls the event. /// - public event EventHandler>? MessageReceived; + protected virtual void OnDisconnected() + { + Disconnected?.Invoke(this, new ConnectionEventArgs(this)); + } /// - /// Invoked when an exception is thrown during any read/write operation over the named pipe. + /// Calls the event. /// - public event EventHandler>? ExceptionOccurred; - - private void OnDisconnected() + /// The message. + /// + protected virtual Task OnMessageReceived(byte[]? message, CancellationToken cancellationToken) { - Disconnected?.Invoke(this, new ConnectionEventArgs(this)); - } + MessageReceived?.Invoke(this, new ConnectionMessageEventArgs(this, message)); - private void OnMessageReceived(T? message) - { - MessageReceived?.Invoke(this, new ConnectionMessageEventArgs(this, message)); + return Task.CompletedTask; } - private void OnExceptionOccurred(Exception exception) + /// + /// Calls the event. + /// + /// The exception. + protected virtual void OnExceptionOccurred(Exception exception) { - ExceptionOccurred?.Invoke(this, new ConnectionExceptionEventArgs(this, exception)); + ExceptionOccurred?.Invoke(this, new ConnectionExceptionEventArgs(this, exception)); } #endregion @@ -98,10 +132,7 @@ internal PipeConnection(PipeStream stream, string pipeName, IFormatter formatter #region Public methods - /// - /// Begins reading from and writing to the named pipe on a background thread. - /// This method returns immediately. - /// + /// public void Start() { if (IsStarted) @@ -116,16 +147,14 @@ public void Start() try { var bytes = await PipeStreamWrapper.ReadAsync(cancellationToken).ConfigureAwait(false); + + // We accept zero-length messages if (bytes == null && !IsConnected) { break; } - var obj = Formatter is IAsyncFormatter asyncFormatter - ? await asyncFormatter.DeserializeAsync(bytes, cancellationToken).ConfigureAwait(false) - : Formatter.Deserialize(bytes); - - OnMessageReceived(obj); + await OnMessageReceived(bytes, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -141,29 +170,32 @@ public void Start() OnDisconnected(); }, OnExceptionOccurred); } + + /// + public async Task WriteAsync(byte[] value, CancellationToken cancellationToken = default) + { + if (!IsConnected || !PipeStreamWrapper.CanWrite) + { + throw new InvalidOperationException("Client is not connected"); + } - /// - /// Writes the specified and waits other end reading - /// - /// - /// - public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + await PipeStreamWrapper.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + public async Task WriteAsync(T value, CancellationToken cancellationToken = default) { if (!IsConnected || !PipeStreamWrapper.CanWrite) { throw new InvalidOperationException("Client is not connected"); } - var bytes = Formatter is IAsyncFormatter asyncFormatter - ? await asyncFormatter.SerializeAsync(value, cancellationToken).ConfigureAwait(false) - : Formatter.Serialize(value); + var bytes = await value.SerializeAsync(Formatter, cancellationToken).ConfigureAwait(false); await PipeStreamWrapper.WriteAsync(bytes, cancellationToken).ConfigureAwait(false); } - - /// - /// Dispose internal resources - /// + + /// public async Task StopAsync() { if (ReadWorker != null) @@ -175,18 +207,8 @@ public async Task StopAsync() await PipeStreamWrapper.StopAsync().ConfigureAwait(false); } - - /// - /// Gets the user name of the client on the other end of the pipe. - /// - /// The user name of the client on the other end of the pipe. - /// is not . - /// No pipe connections have been made yet. - /// The connected pipe has already disconnected. - /// The pipe handle has not been set. - /// The pipe is closed. - /// The pipe connection has been broken. - /// The user name of the client is longer than 19 characters. + + /// public string GetImpersonationUserName() { if (PipeStream is not NamedPipeServerStream serverStream) @@ -207,6 +229,8 @@ public string GetImpersonationUserName() public async ValueTask DisposeAsync() { await StopAsync().ConfigureAwait(false); + + GC.SuppressFinalize(this); } #endregion diff --git a/src/libs/H.Pipes/PipeServer.cs b/src/libs/H.Pipes/PipeServer.cs index bd252a9..d8cff0a 100644 --- a/src/libs/H.Pipes/PipeServer.cs +++ b/src/libs/H.Pipes/PipeServer.cs @@ -10,9 +10,64 @@ namespace H.Pipes; /// /// Wraps a and provides multiple simultaneous client connection handling. +/// Specialized version of for communications based on a single type. /// /// Reference type to read/write from the named pipe -public sealed class PipeServer : IPipeServer +/// +/// +public class PipeServer : PipeServer, IPipeServer +{ + #region Constructors + + /// + public PipeServer(string pipeName, IFormatter? formatter = default) + : base(pipeName, formatter) { } + + #endregion + + #region Events + + /// + public new event EventHandler>? MessageReceived; + + /// + /// Calls the event. + /// + /// The arguments. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } + + #endregion + + #region Public methods + + /// + public Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + return base.WriteAsync(value, cancellationToken); + } + + /// + protected override PipeConnection SetupPipeConnection(NamedPipeServerStream connectionStream, string connectionPipeName, IFormatter formatter) + { + var connection = new PipeConnection(connectionStream, connectionPipeName, Formatter); + + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.Disconnected += (_, args) => OnClientDisconnected(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + + #endregion +} + +/// +/// Wraps a and provides multiple simultaneous client connection handling. +/// +public class PipeServer : IPipeServer { #region Properties @@ -44,12 +99,12 @@ public sealed class PipeServer : IPipeServer /// /// All connections(include disconnected clients) /// - private List> Connections { get; } = new(); + private List Connections { get; } = new(); /// /// Connected clients /// - public IReadOnlyCollection> ConnectedClients => Connections + public IReadOnlyCollection ConnectedClients => Connections .Where(connection => connection.IsConnected) .ToList(); @@ -70,39 +125,55 @@ public sealed class PipeServer : IPipeServer /// /// Invoked whenever a client connects to the server. /// - public event EventHandler>? ClientConnected; + public event EventHandler? ClientConnected; /// /// Invoked whenever a client disconnects from the server. /// - public event EventHandler>? ClientDisconnected; + public event EventHandler? ClientDisconnected; /// /// Invoked whenever a client sends a message to the server. /// - public event EventHandler>? MessageReceived; + public event EventHandler>? MessageReceived; /// /// Invoked whenever an exception is thrown during a read or write operation. /// public event EventHandler? ExceptionOccurred; - private void OnClientConnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnClientConnected(ConnectionEventArgs args) { ClientConnected?.Invoke(this, args); } - private void OnClientDisconnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnClientDisconnected(ConnectionEventArgs args) { ClientDisconnected?.Invoke(this, args); } - private void OnMessageReceived(ConnectionMessageEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnMessageReceived(ConnectionMessageEventArgs args) { MessageReceived?.Invoke(this, args); } - private void OnExceptionOccurred(Exception exception) + /// + /// Calls the event. + /// + /// The exception. + protected virtual void OnExceptionOccurred(Exception exception) { ExceptionOccurred?.Invoke(this, new ExceptionEventArgs(exception)); } @@ -218,15 +289,12 @@ await handshakeWrapper.WriteAsync(Encoding.UTF8.GetBytes(connectionPipeName), to } // Add the client's connection to the list of connections - var connection = new PipeConnection(connectionStream, connectionPipeName, Formatter); - connection.MessageReceived += (_, args) => OnMessageReceived(args); - connection.Disconnected += (_, args) => OnClientDisconnected(args); - connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + var connection = SetupPipeConnection(connectionStream, connectionPipeName, Formatter); connection.Start(); Connections.Add(connection); - OnClientConnected(new ConnectionEventArgs(connection)); + OnClientConnected(new ConnectionEventArgs(connection)); } catch (OperationCanceledException) { @@ -258,42 +326,69 @@ await handshakeWrapper.WriteAsync(Encoding.UTF8.GetBytes(connectionPipeName), to } /// - /// Sends a message to all connected clients asynchronously. - /// This method returns immediately, possibly before the message has been sent to all clients. + /// Instantiates and sets up the pipe connection (event handlers, etc.). /// - /// - /// - public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + /// The connection stream. + /// Name of the connection pipe. + /// The formatter. + /// PipeConnection. + protected virtual PipeConnection SetupPipeConnection( + NamedPipeServerStream connectionStream, string connectionPipeName, IFormatter formatter) { - await WriteAsync(value, predicate: null, cancellationToken).ConfigureAwait(false); + var connection = new PipeConnection(connectionStream, connectionPipeName, Formatter); + + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.Disconnected += (_, args) => OnClientDisconnected(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; } - /// - /// Sends a message to all connected clients asynchronously. - /// - /// - /// - /// - public async Task WriteAsync(T value, Predicate>? predicate, CancellationToken cancellationToken = default) + /// + public async Task WriteAsync(byte[] value, CancellationToken cancellationToken = default) + { + await WriteAsync(value, predicate: null, cancellationToken).ConfigureAwait(false); + } + + /// + public async Task WriteAsync(byte[] value, string pipeName, CancellationToken cancellationToken = default) + { + await WriteAsync(value, connection => connection.PipeName == pipeName, cancellationToken).ConfigureAwait(false); + } + + /// + public async Task WriteAsync(byte[] value, Predicate? predicate, CancellationToken cancellationToken = default) { var tasks = Connections - .Where(connection => connection.IsConnected && (predicate == null || predicate(connection))) - .Select(connection => connection.WriteAsync(value, cancellationToken)) - .ToList(); + .Where(connection => connection.IsConnected && (predicate == null || predicate(connection))) + .Select(connection => connection.WriteAsync(value, cancellationToken)) + .ToList(); await Task.WhenAll(tasks).ConfigureAwait(false); } - - /// - /// Sends a message to the given client by pipe name. - /// - /// - /// - /// - public async Task WriteAsync(T value, string pipeName, CancellationToken cancellationToken = default) + + /// + public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + await WriteAsync(value, predicate: null, cancellationToken).ConfigureAwait(false); + } + + /// + public async Task WriteAsync(T value, string pipeName, CancellationToken cancellationToken = default) { await WriteAsync(value, connection => connection.PipeName == pipeName, cancellationToken).ConfigureAwait(false); } + + /// + public async Task WriteAsync(T value, Predicate? predicate, CancellationToken cancellationToken = default) + { + var tasks = Connections + .Where(connection => connection.IsConnected && (predicate == null || predicate(connection))) + .Select(connection => connection.WriteAsync(value, cancellationToken)) + .ToList(); + + await Task.WhenAll(tasks).ConfigureAwait(false); + } /// /// Closes all open client connections and stops listening for new ones. @@ -333,6 +428,8 @@ public async ValueTask DisposeAsync() _isDisposed = true; await StopAsync().ConfigureAwait(false); + + GC.SuppressFinalize(this); } #endregion diff --git a/src/libs/H.Pipes/SingleConnectionPipeClient.cs b/src/libs/H.Pipes/SingleConnectionPipeClient.cs index b6ac264..b158614 100644 --- a/src/libs/H.Pipes/SingleConnectionPipeClient.cs +++ b/src/libs/H.Pipes/SingleConnectionPipeClient.cs @@ -7,9 +7,74 @@ namespace H.Pipes; /// /// Wraps a . +/// Specialized version of for communications based on a single type. +/// Implements the +/// Implements the /// /// Reference type to read/write from the named pipe -public sealed class SingleConnectionPipeClient : IPipeClient +/// +/// +public class SingleConnectionPipeClient : SingleConnectionPipeClient, IPipeClient +{ + #region Constructors + + /// + public SingleConnectionPipeClient(string pipeName, string serverName = ".", TimeSpan? reconnectionInterval = default, IFormatter? formatter = default) + : base(pipeName, serverName, reconnectionInterval, formatter) { } + + #endregion + + #region Events + + /// + public new event EventHandler>? MessageReceived; + + /// + /// Calls the event. + /// + /// The arguments. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } + + #endregion + + #region Public methods + + /// + public Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + return base.WriteAsync(value, cancellationToken); + } + + /// + protected override PipeConnection SetupPipeConnection( + PipeStream dataPipe, string connectionPipeName, IFormatter formatter, string serverName) + { + var connection = new PipeConnection(dataPipe, connectionPipeName, formatter, serverName); + + connection.Disconnected += async (_, args) => + { + await DisconnectInternalAsync().ConfigureAwait(false); + + OnDisconnected(args); + }; + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + + #endregion +} + +/// +/// Wraps a . +/// Implements the +/// +/// +public class SingleConnectionPipeClient : IPipeClient { #region Fields @@ -32,7 +97,7 @@ public sealed class SingleConnectionPipeClient : IPipeClient public bool IsConnecting { get => _isConnecting; - private set => _isConnecting = value; + protected set => _isConnecting = value; } /// @@ -45,9 +110,13 @@ public bool IsConnecting public string ServerName { get; } /// - public PipeConnection? Connection { get; private set; } + public PipeConnection? Connection { get; protected set; } - private System.Timers.Timer ReconnectionTimer { get; } + /// + /// Gets the reconnection timer. + /// + /// The reconnection timer. + protected System.Timers.Timer ReconnectionTimer { get; } #endregion @@ -56,39 +125,55 @@ public bool IsConnecting /// /// Invoked whenever a message is received from the server. /// - public event EventHandler>? MessageReceived; + public event EventHandler>? MessageReceived; /// /// Invoked when the client disconnects from the server (e.g., the pipe is closed or broken). /// - public event EventHandler>? Disconnected; + public event EventHandler? Disconnected; /// /// Invoked after each the client connect to the server (include reconnects). /// - public event EventHandler>? Connected; + public event EventHandler? Connected; /// /// Invoked whenever an exception is thrown during a read or write operation on the named pipe. /// public event EventHandler? ExceptionOccurred; - private void OnMessageReceived(ConnectionMessageEventArgs args) - { - MessageReceived?.Invoke(this, args); - } - - private void OnDisconnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnDisconnected(ConnectionEventArgs args) { Disconnected?.Invoke(this, args); } - private void OnConnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnConnected(ConnectionEventArgs args) { Connected?.Invoke(this, args); } + + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } - private void OnExceptionOccurred(Exception exception) + /// + /// Calls the event. + /// + /// The exception. + protected void OnExceptionOccurred(Exception exception) { ExceptionOccurred?.Invoke(this, new ExceptionEventArgs(exception)); } @@ -113,7 +198,7 @@ public SingleConnectionPipeClient(string pipeName, string serverName = ".", Time ReconnectionInterval = reconnectionInterval ?? TimeSpan.FromMilliseconds(100); ReconnectionTimer = new System.Timers.Timer(ReconnectionInterval.TotalMilliseconds); - ReconnectionTimer.Elapsed += async (sender, args) => + ReconnectionTimer.Elapsed += async (_, _) => { try { @@ -151,6 +236,16 @@ public SingleConnectionPipeClient(string pipeName, string serverName = ".", Time /// public async Task ConnectAsync(CancellationToken cancellationToken = default) { + while (IsConnecting) + { + await Task.Delay(TimeSpan.FromMilliseconds(1), cancellationToken).ConfigureAwait(false); + } + + if (IsConnected) + { + return; + } + try { IsConnecting = true; @@ -159,29 +254,17 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default) { ReconnectionTimer.Start(); } - if (IsConnected) - { - throw new InvalidOperationException("Already connected"); - } #pragma warning disable CA2000 // Dispose objects before losing scope var dataPipe = await PipeClientFactory .CreateAndConnectAsync(PipeName, ServerName, cancellationToken) #pragma warning restore CA2000 // Dispose objects before losing scope .ConfigureAwait(false); - - Connection = new PipeConnection(dataPipe, PipeName, Formatter, ServerName); - Connection.Disconnected += async (sender, args) => - { - await DisconnectInternalAsync().ConfigureAwait(false); - - OnDisconnected(args); - }; - Connection.MessageReceived += (sender, args) => OnMessageReceived(args); - Connection.ExceptionOccurred += (sender, args) => OnExceptionOccurred(args.Exception); + + Connection = SetupPipeConnection(dataPipe, PipeName, Formatter, ServerName); Connection.Start(); - OnConnected(new ConnectionEventArgs(Connection)); + OnConnected(new ConnectionEventArgs(Connection)); } catch (Exception) { @@ -206,19 +289,48 @@ public async Task DisconnectAsync(CancellationToken _ = default) await DisconnectInternalAsync().ConfigureAwait(false); } - - private async Task DisconnectInternalAsync() + + /// + /// Disconnects from the server. Does not stop . + /// + /// A Task representing the asynchronous operation. + protected async Task DisconnectInternalAsync() { if (Connection == null) // nullable detection system is not very smart { return; } - await Connection.StopAsync().ConfigureAwait(false); + await Connection.DisposeAsync().ConfigureAwait(false); Connection = null; } + /// + /// Instantiates and sets up the pipe connection (event handlers, etc.). + /// + /// The pipe stream. + /// Name of the connection pipe. + /// The formatter. + /// + /// PipeConnection. + protected virtual PipeConnection SetupPipeConnection( + PipeStream dataPipe, string connectionPipeName, IFormatter formatter, string serverName) + { + var connection = new PipeConnection(dataPipe, connectionPipeName, formatter, serverName); + + connection.Disconnected += async (_, args) => + { + await DisconnectInternalAsync().ConfigureAwait(false); + + OnDisconnected(args); + }; + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + /// /// Sends a message to the server over a named pipe.
/// If client is not connected, is occurred @@ -226,22 +338,43 @@ private async Task DisconnectInternalAsync() /// Message to send to the server. /// /// - public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + public async Task WriteAsync(byte[] value, CancellationToken cancellationToken = default) + { + await ReconnectOrThrow(cancellationToken).ConfigureAwait(false); + + await Connection!.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + /// Sends a message to the server over a named pipe.
+ /// If client is not connected, is occurred + ///
+ /// Message to send to the server. + /// + /// + public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + await ReconnectOrThrow(cancellationToken).ConfigureAwait(false); + + await Connection!.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + /// Reconnects the client if needed and throws an exception when failed. + /// + /// The cancellation token that can be used by other objects or threads to receive notice of cancellation. + /// Client is not connected + protected async Task ReconnectOrThrow(CancellationToken cancellationToken = default) { - if (!IsConnected && AutoReconnect && !IsConnecting) + if (!IsConnected && AutoReconnect) { await ConnectAsync(cancellationToken).ConfigureAwait(false); } - while (IsConnecting) - { - await Task.Delay(TimeSpan.FromMilliseconds(1), cancellationToken).ConfigureAwait(false); - } - if (Connection == null) // nullable detection system is not very smart + + if (Connection == null) { throw new InvalidOperationException("Client is not connected"); } - - await Connection.WriteAsync(value, cancellationToken).ConfigureAwait(false); } #endregion @@ -256,6 +389,8 @@ public async ValueTask DisposeAsync() ReconnectionTimer.Dispose(); await DisconnectInternalAsync().ConfigureAwait(false); + + GC.SuppressFinalize(this); } #endregion diff --git a/src/libs/H.Pipes/SingleConnectionPipeServer.cs b/src/libs/H.Pipes/SingleConnectionPipeServer.cs index 0fef4ee..03fb9ed 100644 --- a/src/libs/H.Pipes/SingleConnectionPipeServer.cs +++ b/src/libs/H.Pipes/SingleConnectionPipeServer.cs @@ -6,11 +6,68 @@ namespace H.Pipes; + /// /// Wraps a and optimized for one connection. +/// Specialized version of for communications based on a single type. /// /// Reference type to read/write from the named pipe -public sealed class SingleConnectionPipeServer : IPipeServer +/// +/// +public class SingleConnectionPipeServer : SingleConnectionPipeServer, IPipeServer +{ + #region Constructors + + /// + public SingleConnectionPipeServer(string pipeName, IFormatter? formatter = default) + : base(pipeName, formatter) { } + + #endregion + + #region Events + + /// + public new event EventHandler>? MessageReceived; + + /// + /// Calls the event. + /// + /// The arguments. + protected void OnMessageReceived(ConnectionMessageEventArgs args) + { + MessageReceived?.Invoke(this, args); + } + + #endregion + + #region Public methods + + /// + public Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + return base.WriteAsync(value, cancellationToken); + } + + /// + protected override PipeConnection SetupPipeConnection(NamedPipeServerStream connectionStream, string connectionPipeName, IFormatter formatter) + { + var connection = new PipeConnection(connectionStream, connectionPipeName, Formatter); + + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.Disconnected += (_, args) => OnClientDisconnected(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + + #endregion +} + + +/// +/// Wraps a and optimized for one connection. +/// +public class SingleConnectionPipeServer : IPipeServer { #region Properties @@ -42,7 +99,7 @@ public sealed class SingleConnectionPipeServer : IPipeServer /// /// Connection /// - public PipeConnection? Connection { get; private set; } + public PipeConnection? Connection { get; private set; } /// /// IsStarted @@ -61,39 +118,55 @@ public sealed class SingleConnectionPipeServer : IPipeServer /// /// Invoked whenever a client connects to the server. /// - public event EventHandler>? ClientConnected; + public event EventHandler? ClientConnected; /// /// Invoked whenever a client disconnects from the server. /// - public event EventHandler>? ClientDisconnected; + public event EventHandler? ClientDisconnected; /// /// Invoked whenever a client sends a message to the server. /// - public event EventHandler>? MessageReceived; + public event EventHandler>? MessageReceived; /// /// Invoked whenever an exception is thrown during a read or write operation. /// public event EventHandler? ExceptionOccurred; - private void OnClientConnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnClientConnected(ConnectionEventArgs args) { ClientConnected?.Invoke(this, args); } - private void OnClientDisconnected(ConnectionEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnClientDisconnected(ConnectionEventArgs args) { ClientDisconnected?.Invoke(this, args); } - private void OnMessageReceived(ConnectionMessageEventArgs args) + /// + /// Calls the event. + /// + /// The instance containing the event data. + protected virtual void OnMessageReceived(ConnectionMessageEventArgs args) { MessageReceived?.Invoke(this, args); } - private void OnExceptionOccurred(Exception exception) + /// + /// Calls the event. + /// + /// The exception. + protected virtual void OnExceptionOccurred(Exception exception) { ExceptionOccurred?.Invoke(this, new ExceptionEventArgs(exception)); } @@ -141,7 +214,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) { try { - if (Connection != null && Connection.IsConnected) + if (Connection is { IsConnected: true }) { await Task.Delay(TimeSpan.FromMilliseconds(1), cancellationToken).ConfigureAwait(false); continue; @@ -176,12 +249,10 @@ public async Task StartAsync(CancellationToken cancellationToken = default) throw; } - var connection = new PipeConnection(connectionStream, PipeName, Formatter); + var connection = SetupPipeConnection(connectionStream, PipeName, Formatter); + try { - connection.MessageReceived += (sender, args) => OnMessageReceived(args); - connection.Disconnected += (sender, args) => OnClientDisconnected(args); - connection.ExceptionOccurred += (sender, args) => OnExceptionOccurred(args.Exception); connection.Start(); } catch @@ -193,7 +264,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) Connection = connection; - OnClientConnected(new ConnectionEventArgs(connection)); + OnClientConnected(new ConnectionEventArgs(connection)); } catch (OperationCanceledException) { @@ -237,19 +308,77 @@ public async Task StartAsync(CancellationToken cancellationToken = default) } /// - /// Sends a message to all connected clients asynchronously. + /// Instantiates and sets up the pipe connection (event handlers, etc.). /// - /// - /// - public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + /// The connection stream. + /// Name of the connection pipe. + /// The formatter. + /// PipeConnection. + protected virtual PipeConnection SetupPipeConnection( + NamedPipeServerStream connectionStream, string connectionPipeName, IFormatter formatter) { - if (Connection == null || !Connection.IsConnected) + var connection = new PipeConnection(connectionStream, connectionPipeName, Formatter); + + connection.MessageReceived += (_, args) => OnMessageReceived(args); + connection.Disconnected += (_, args) => OnClientDisconnected(args); + connection.ExceptionOccurred += (_, args) => OnExceptionOccurred(args.Exception); + + return connection; + } + + /// + public async Task WriteAsync(byte[] value, CancellationToken cancellationToken = default) + { + if (Connection is not { IsConnected: true }) { return; } await Connection.WriteAsync(value, cancellationToken).ConfigureAwait(false); } + + /// + /// + [Obsolete("Cannot filter connections on a single connection server.", true)] + public Task WriteAsync(byte[] value, string pipeName, CancellationToken cancellationToken = default) + { + throw new InvalidOperationException("Cannot filter connections on a single connection server."); + } + + /// + /// + [Obsolete("Cannot filter connections on a single connection server.", true)] + public Task WriteAsync(byte[] value, Predicate? predicate, CancellationToken cancellationToken = default) + { + throw new InvalidOperationException("Cannot filter connections on a single connection server."); + } + + /// + public async Task WriteAsync(T value, CancellationToken cancellationToken = default) + { + if (Connection is not { IsConnected: true }) + { + return; + } + + await Connection.WriteAsync(value, cancellationToken).ConfigureAwait(false); + } + + /// + /// + [Obsolete("Cannot filter connections on a single connection server.", true)] + public Task WriteAsync(T value, string pipeName, CancellationToken cancellationToken = default) + { + throw new InvalidOperationException("Cannot filter connections on a single connection server."); + } + + /// + /// + [Obsolete("Cannot filter connections on a single connection server.", true)] + public Task WriteAsync(T value, Predicate? predicate, CancellationToken cancellationToken = default) + { + throw new InvalidOperationException("Cannot filter connections on a single connection server."); + } /// /// Closes all open client connections and stops listening for new ones. @@ -288,6 +417,8 @@ public async ValueTask DisposeAsync() _isDisposed = true; await StopAsync().ConfigureAwait(false); + + GC.SuppressFinalize(this); } #endregion diff --git a/src/tests/H.Pipes.Tests/BaseTests.cs b/src/tests/H.Pipes.Tests/BaseTests.cs index 22a8bcb..c531aa8 100644 --- a/src/tests/H.Pipes.Tests/BaseTests.cs +++ b/src/tests/H.Pipes.Tests/BaseTests.cs @@ -1,22 +1,99 @@ using System.Diagnostics; using System.Text; using H.Formatters; +using H.Pipes.Extensions; namespace H.Pipes.Tests; public static class BaseTests { - public static async Task DataTestAsync(IPipeServer server, IPipeClient client, List values, Func? hashFunc = null, CancellationToken cancellationToken = default) + public static void SetupMessageReceived( + IPipeServer server, + IPipeClient client, + Action setActualHashFunc, + Func> getTcsFunc, + Func? hashFunc, + CancellationToken cancellationToken) + { + server.MessageReceived += async (_, args) => + { + Trace.WriteLine($"Server_OnMessageReceived: {args.Message}"); + + T? value = default; + + if (args.Message != null) + value = await args.Message.DeserializeAsync(server.Formatter, cancellationToken); + + var actualHash = hashFunc?.Invoke(value); + setActualHashFunc(actualHash); + Trace.WriteLine($"ActualHash: {actualHash}"); + + // ReSharper disable once AccessToModifiedClosure + _ = getTcsFunc().TrySetResult(true); + }; + + client.MessageReceived += (_, args) => Trace.WriteLine($"Client_OnMessageReceived: {args.Message}"); + } + + public static void SetupMessageReceived( + IPipeServer server, + IPipeClient client, + Action setActualHashFunc, + Func> getTcsFunc, + Func? hashFunc) + { + server.MessageReceived += (_, args) => + { + Trace.WriteLine($"Server_OnMessageReceived: {args.Message}"); + + var actualHash = hashFunc?.Invoke(args.Message); + + setActualHashFunc(actualHash); + Trace.WriteLine($"ActualHash: {actualHash}"); + + // ReSharper disable once AccessToModifiedClosure + _ = getTcsFunc().TrySetResult(true); + }; + + client.MessageReceived += (_, args) => Trace.WriteLine($"Client_OnMessageReceived: {args.Message}"); + } + + public static async Task DataTestAsync( + IPipeServer server, + IPipeClient client, + List values, + Func? hashFunc = null, + CancellationToken cancellationToken = default, + bool useGeneric = false) { Trace.WriteLine("Setting up test..."); var completionSource = new TaskCompletionSource(false); + // ReSharper disable once AccessToModifiedClosure using var registration = cancellationToken.Register(() => completionSource.TrySetCanceled(cancellationToken)); - var actualHash = (string?)null; + var actualHash = (string?)null; var clientDisconnected = false; + + // + // Shared client/server setup + + if (useGeneric) + SetupMessageReceived( + (IPipeServer)server, (IPipeClient)client, + h => actualHash = h, () => completionSource, hashFunc); + + else + SetupMessageReceived( + server, client, + h => actualHash = h, () => completionSource, hashFunc, cancellationToken); + + + // + // Setup the server + server.ClientConnected += (_, _) => { Trace.WriteLine("Client connected"); @@ -27,52 +104,59 @@ public static async Task DataTestAsync(IPipeServer server, IPipeClient clientDisconnected = true; // ReSharper disable once AccessToModifiedClosure - completionSource.TrySetResult(true); - }; - server.MessageReceived += (_, args) => - { - Trace.WriteLine($"Server_OnMessageReceived: {args.Message}"); - actualHash = hashFunc?.Invoke(args.Message); - Trace.WriteLine($"ActualHash: {actualHash}"); - - // ReSharper disable once AccessToModifiedClosure - completionSource.TrySetResult(true); + _ = completionSource.TrySetResult(true); }; server.ExceptionOccurred += (_, args) => { Trace.WriteLine($"Server exception occurred: {args.Exception}"); // ReSharper disable once AccessToModifiedClosure - completionSource.TrySetException(args.Exception); + _ = completionSource.TrySetException(args.Exception); }; - client.Connected += (_, _) => Trace.WriteLine("Client_OnConnected"); + + + // + // Setup the client + + client.Connected += (_, _) => Trace.WriteLine("Client_OnConnected"); client.Disconnected += (_, _) => Trace.WriteLine("Client_OnDisconnected"); - client.MessageReceived += (_, args) => Trace.WriteLine($"Client_OnMessageReceived: {args.Message}"); client.ExceptionOccurred += (_, args) => { Trace.WriteLine($"Client exception occurred: {args.Exception}"); // ReSharper disable once AccessToModifiedClosure - completionSource.TrySetException(args.Exception); + _ = completionSource.TrySetException(args.Exception); }; + + + // + // Setup exception handling + AppDomain.CurrentDomain.UnhandledException += (_, args) => { if (args.ExceptionObject is Exception exception) { // ReSharper disable once AccessToModifiedClosure - completionSource.TrySetException(exception); + _ = completionSource.TrySetException(exception); } }; - server.ExceptionOccurred += (_, args) => Trace.WriteLine(args.Exception.ToString()); client.ExceptionOccurred += (_, args) => Trace.WriteLine(args.Exception.ToString()); + + // + // Start up the server and client + await server.StartAsync(cancellationToken).ConfigureAwait(false); await client.ConnectAsync(cancellationToken).ConfigureAwait(false); Trace.WriteLine("Client and server started"); Trace.WriteLine("---"); + + // + // Begin testing + var watcher = Stopwatch.StartNew(); foreach (var value in values) @@ -82,7 +166,7 @@ public static async Task DataTestAsync(IPipeServer server, IPipeClient await client.WriteAsync(value, cancellationToken).ConfigureAwait(false); - await completionSource.Task.ConfigureAwait(false); + _ = await completionSource.Task.ConfigureAwait(false); if (hashFunc != null) { @@ -101,45 +185,112 @@ public static async Task DataTestAsync(IPipeServer server, IPipeClient Trace.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~"); } - public static async Task DataTestAsync(List values, Func? hashFunc = null, IFormatter? formatter = default, TimeSpan? timeout = default) + private static PipeServer CreateServer( + string pipeName, + IFormatter? formatter, + bool useGeneric) + { + return useGeneric + ? new PipeServer(pipeName, formatter) + : new PipeServer(pipeName, formatter ?? new BinaryFormatter()); + } + + private static SingleConnectionPipeServer CreateSingleConnectionServer( + string pipeName, + IFormatter? formatter, + bool useGeneric) + { + return useGeneric + ? new SingleConnectionPipeServer(pipeName, formatter) + : new SingleConnectionPipeServer(pipeName, formatter ?? new BinaryFormatter()); + } + + private static PipeClient CreateClient( + string pipeName, + IFormatter? formatter, + bool useGeneric) + { + return useGeneric + ? new PipeClient(pipeName, formatter: formatter) + : new PipeClient(pipeName, formatter: formatter); + } + + private static SingleConnectionPipeClient CreateSingleConnectionClient( + string pipeName, + IFormatter? formatter, + bool useGeneric) { + return useGeneric + ? new SingleConnectionPipeClient(pipeName, formatter: formatter) + : new SingleConnectionPipeClient(pipeName, formatter: formatter); + } + + public static async Task DataTestAsync( + List values, + Func? hashFunc = null, + IFormatter? formatter = default, + TimeSpan? timeout = default, + bool useGeneric = false) + { + formatter ??= new BinaryFormatter(); + using var cancellationTokenSource = new CancellationTokenSource(timeout ?? TimeSpan.FromMinutes(1)); const string pipeName = "data_test_pipe"; - await using var server = new PipeServer(pipeName, formatter ?? new BinaryFormatter()) - { + await using var server = CreateServer(pipeName, formatter, useGeneric); + #if NET48 - // https://github.com/HavenDV/H.Pipes/issues/6 - WaitFreePipe = true, + // https://github.com/HavenDV/H.Pipes/issues/6 + server.WaitFreePipe = true; #endif - }; - await using var client = new PipeClient(pipeName, formatter: formatter ?? new BinaryFormatter()); - await DataTestAsync(server, client, values, hashFunc, cancellationTokenSource.Token); + await using var client = CreateClient(pipeName, formatter, useGeneric); + + await DataTestAsync(server, client, values, hashFunc, cancellationTokenSource.Token, useGeneric); } - public static async Task DataSingleTestAsync(List values, Func? hashFunc = null, IFormatter? formatter = default, TimeSpan? timeout = default) + public static async Task DataSingleTestAsync( + List values, + Func? hashFunc = null, + IFormatter? formatter = default, + TimeSpan? timeout = default, + bool useGeneric = false) { + formatter ??= new BinaryFormatter(); + using var cancellationTokenSource = new CancellationTokenSource(timeout ?? TimeSpan.FromMinutes(1)); - const string pipeName = "data_test_pipe"; - await using var server = new SingleConnectionPipeServer(pipeName, formatter ?? new BinaryFormatter()) - { - WaitFreePipe = true - }; - await using var client = new SingleConnectionPipeClient(pipeName, formatter: formatter ?? new BinaryFormatter()); + const string pipeName = "data_test_pipe"; + await using var server = CreateSingleConnectionServer(pipeName, formatter, useGeneric); + +#if NET48 + // https://github.com/HavenDV/H.Pipes/issues/6 + //server.WaitFreePipe = true; +#endif + + await using var client = CreateSingleConnectionClient(pipeName, formatter, useGeneric); - await DataTestAsync(server, client, values, hashFunc, cancellationTokenSource.Token); + await DataTestAsync(server, client, values, hashFunc, cancellationTokenSource.Token, useGeneric); } - public static async Task BinaryDataTestAsync(int numBytes, int count = 1, IFormatter? formatter = default, TimeSpan? timeout = default) + public static async Task BinaryDataTestAsync( + int numBytes, + int count = 1, + IFormatter? formatter = default, + TimeSpan? timeout = default, + bool useGeneric = false) { - await DataTestAsync(GenerateData(numBytes, count), Hash, formatter, timeout); + await DataTestAsync(GenerateData(numBytes, count), Hash, formatter, timeout, useGeneric); } - public static async Task BinaryDataSingleTestAsync(int numBytes, int count = 1, IFormatter? formatter = default, TimeSpan? timeout = default) + public static async Task BinaryDataSingleTestAsync( + int numBytes, + int count = 1, + IFormatter? formatter = default, + TimeSpan? timeout = default, + bool useGeneric = false) { - await DataSingleTestAsync(GenerateData(numBytes, count), Hash, formatter, timeout); + await DataSingleTestAsync(GenerateData(numBytes, count), Hash, formatter, timeout, useGeneric); } #region Helper methods diff --git a/src/tests/H.Pipes.Tests/DataTests.cs b/src/tests/H.Pipes.Tests/DataTests.cs index bbda622..a63e409 100644 --- a/src/tests/H.Pipes.Tests/DataTests.cs +++ b/src/tests/H.Pipes.Tests/DataTests.cs @@ -6,38 +6,44 @@ namespace H.Pipes.Tests; public class DataTests { [TestMethod] - public async Task NullTest() + [DataRow(true)] + [DataRow(false)] + public async Task NullTest(bool useGeneric) { var values = new List { null }; static string HashFunc(string? value) => value ?? "null"; - await BaseTests.DataSingleTestAsync(values, HashFunc); - await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter()); + await BaseTests.DataSingleTestAsync(values, HashFunc, useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter(), useGeneric: useGeneric); } [TestMethod] - public async Task EmptyArrayTest() + [DataRow(true)] + [DataRow(false)] + public async Task EmptyArrayTest(bool useGeneric) { var values = new List { Array.Empty() }; static string HashFunc(byte[]? value) => value?.Length.ToString() ?? "null"; - await BaseTests.DataSingleTestAsync(values, HashFunc); - await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter()); + await BaseTests.DataSingleTestAsync(values, HashFunc, useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter(), useGeneric: useGeneric); values = new List { null }; - await BaseTests.DataSingleTestAsync(values, HashFunc); - await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter()); - await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter()); + await BaseTests.DataSingleTestAsync(values, HashFunc, useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new NewtonsoftJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new SystemTextJsonFormatter(), useGeneric: useGeneric); + await BaseTests.DataSingleTestAsync(values, HashFunc, new CerasFormatter(), useGeneric: useGeneric); } [TestMethod] - public async Task EmptyArrayParallelTest() + [DataRow(true)] + [DataRow(false)] + public async Task EmptyArrayParallelTest(bool useGeneric) { using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(1)); var cancellationToken = cancellationTokenSource.Token; @@ -61,105 +67,139 @@ public async Task EmptyArrayParallelTest() } [TestMethod] - public async Task TestEmptyMessageDoesNotDisconnectClient() + [DataRow(true)] + [DataRow(false)] + public async Task TestEmptyMessageDoesNotDisconnectClient(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(0); + await BaseTests.BinaryDataTestAsync(0, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize1B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize1B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(1); + await BaseTests.BinaryDataTestAsync(1, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize2B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize2B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(2); + await BaseTests.BinaryDataTestAsync(2, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize3B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize3B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(3); + await BaseTests.BinaryDataTestAsync(3, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize9B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize9B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(9); + await BaseTests.BinaryDataTestAsync(9, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize33B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize33B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(33); + await BaseTests.BinaryDataTestAsync(33, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize1Kx3_NewtonsoftJson() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize1Kx3_NewtonsoftJson(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(1025, 3, new NewtonsoftJsonFormatter()); + await BaseTests.BinaryDataTestAsync(1025, 3, new NewtonsoftJsonFormatter(), useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize1Kx3_SystemTextJson() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize1Kx3_SystemTextJson(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(1025, 3, new SystemTextJsonFormatter()); + await BaseTests.BinaryDataTestAsync(1025, 3, new SystemTextJsonFormatter(), useGeneric: useGeneric); } //[TestMethod] - //public async Task TestMessageSize1Kx3_Ceras() + //[DataRow(true)] + //[DataRow(false)] + //public async Task TestMessageSize1Kx3_Ceras(bool useGeneric) //{ - // await BaseTests.BinaryDataTestAsync(1025, 3, new CerasFormatter()); + // await BaseTests.BinaryDataTestAsync(1025, 3, new CerasFormatter(), useGeneric: useGeneric); //} [TestMethod] - public async Task TestMessageSize129B() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize129B(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(129); + await BaseTests.BinaryDataTestAsync(129, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize1K() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize1K(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(1025); + await BaseTests.BinaryDataTestAsync(1025, useGeneric: useGeneric); } [TestMethod] - public async Task TestMessageSize1M() + [DataRow(true)] + [DataRow(false)] + public async Task TestMessageSize1M(bool useGeneric) { - await BaseTests.BinaryDataTestAsync(1024 * 1024 + 1); + await BaseTests.BinaryDataTestAsync(1024 * 1024 + 1, useGeneric: useGeneric); } [TestMethod] - public async Task Single_TestEmptyMessageDoesNotDisconnectClient() + [DataRow(true)] + [DataRow(false)] + public async Task Single_TestEmptyMessageDoesNotDisconnectClient(bool useGeneric) { - await BaseTests.BinaryDataSingleTestAsync(0); + await BaseTests.BinaryDataSingleTestAsync(0, useGeneric: useGeneric); } [TestMethod] - public async Task Single_TestMessageSize1B() + [DataRow(true)] + [DataRow(false)] + public async Task Single_TestMessageSize1B(bool useGeneric) { - await BaseTests.BinaryDataSingleTestAsync(1); + await BaseTests.BinaryDataSingleTestAsync(1, useGeneric: useGeneric); } [TestMethod] - public async Task Single_TestMessageSize1Kx3_NewtonsoftJson() + [DataRow(true)] + [DataRow(false)] + public async Task Single_TestMessageSize1Kx3_NewtonsoftJson(bool useGeneric) { - await BaseTests.BinaryDataSingleTestAsync(1025, 3, new NewtonsoftJsonFormatter()); + await BaseTests.BinaryDataSingleTestAsync(1025, 3, new NewtonsoftJsonFormatter(), useGeneric: useGeneric); } [TestMethod] - public async Task Single_TestMessageSize1Kx3_SystemTextJson() + [DataRow(true)] + [DataRow(false)] + public async Task Single_TestMessageSize1Kx3_SystemTextJson(bool useGeneric) { - await BaseTests.BinaryDataSingleTestAsync(1025, 3, new SystemTextJsonFormatter()); + await BaseTests.BinaryDataSingleTestAsync(1025, 3, new SystemTextJsonFormatter(), useGeneric: useGeneric); } //[TestMethod] - //public async Task Single_TestMessageSize1Kx3_Ceras() + //[DataRow(true)] + //[DataRow(false)] + //public async Task Single_TestMessageSize1Kx3_Ceras(bool useGeneric) //{ - // await BaseTests.BinaryDataSingleTestAsync(1025, 3, new CerasFormatter()); + // await BaseTests.BinaryDataSingleTestAsync(1025, 3, new CerasFormatter(), useGeneric: useGeneric); //} [TestMethod] diff --git a/src/tests/H.Pipes.Tests/PipeClientTests.cs b/src/tests/H.Pipes.Tests/PipeClientTests.cs index 7ccb52a..6f9df11 100644 --- a/src/tests/H.Pipes.Tests/PipeClientTests.cs +++ b/src/tests/H.Pipes.Tests/PipeClientTests.cs @@ -4,17 +4,17 @@ public class PipeClientTests { [TestMethod] - public async Task ConnectTest() + public async Task ConnectCancellationTest() { - using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(1)); - await using var client = new PipeClient("this_pipe_100%_is_not_exists"); + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + await using var client = new PipeClient("this_pipe_100%_is_not_exists"); await Assert.ThrowsExceptionAsync( async () => await client.ConnectAsync(cancellationTokenSource.Token)); } [TestMethod] - public async Task WriteAsyncTest() + public async Task WriteAsyncCancellationTest() { using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(15)); var cancellationToken = cancellationTokenSource.Token; From d079bae3456b3551dee95c6491e0d7ccf5c2a21c Mon Sep 17 00:00:00 2001 From: Alexis Incogito Date: Sun, 24 Apr 2022 08:23:03 +0200 Subject: [PATCH 2/5] Update src/libs/H.Formatters.Inferno/PipeClientExtensions.cs Co-authored-by: Konstantin S. <3002068+HavenDV@users.noreply.github.com> --- src/libs/H.Formatters.Inferno/PipeClientExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs index 2c9682c..c400948 100644 --- a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs @@ -27,7 +27,7 @@ public static void EnableEncryption( Action? exceptionAction = null) { client = client ?? throw new ArgumentNullException(nameof(client)); - client.Connected += async (_, connArgs) => + client.Connected += static async (_, args) => { try { From 603d38ad9995cf4f0eabac1e3c99159ae67eeb3f Mon Sep 17 00:00:00 2001 From: Alexis Incogito Date: Sun, 24 Apr 2022 10:51:49 +0200 Subject: [PATCH 3/5] - Updated: Reverted the names of arguments in the lamba functions --- .../PipeClientExtensions.cs | 22 ++++++++-------- .../PipeServerExtensions.cs | 26 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs index c400948..a24e7dd 100644 --- a/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeClientExtensions.cs @@ -27,34 +27,34 @@ public static void EnableEncryption( Action? exceptionAction = null) { client = client ?? throw new ArgumentNullException(nameof(client)); - client.Connected += static async (_, args) => + client.Connected += async (_, args) => { try { - var pipeName = $"{connArgs.Connection.PipeName}_Inferno"; + var pipeName = $"{args.Connection.PipeName}_Inferno"; using var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = source.Token; - var infClient = new SingleConnectionPipeClient(pipeName, connArgs.Connection.ServerName, formatter: connArgs.Connection.Formatter); + var client = new SingleConnectionPipeClient(pipeName, args.Connection.ServerName, formatter: args.Connection.Formatter); - infClient.ExceptionOccurred += (_, exArgs) => + client.ExceptionOccurred += (_, args) => { - Debug.WriteLine($"{nameof(EnableEncryption)} client returns exception: {exArgs.Exception}"); + Debug.WriteLine($"{nameof(EnableEncryption)} client returns exception: {args.Exception}"); - exceptionAction?.Invoke(exArgs.Exception); + exceptionAction?.Invoke(args.Exception); }; - await using (infClient.ConfigureAwait(false)) + await using (client.ConfigureAwait(false)) { using var keyPair = new KeyPair(); - await infClient.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); + await client.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); - var response = await infClient.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + var response = await client.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); var serverPublicKey = response.Message; - connArgs.Connection.Formatter = new InfernoFormatter( - connArgs.Connection.Formatter, + args.Connection.Formatter = new InfernoFormatter( + args.Connection.Formatter, keyPair.GenerateSharedKey(serverPublicKey)); } } diff --git a/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs b/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs index 6893ae6..b314f9f 100644 --- a/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs +++ b/src/libs/H.Formatters.Inferno/PipeServerExtensions.cs @@ -27,44 +27,44 @@ public static void EnableEncryption( Action? exceptionAction = null) { server = server ?? throw new ArgumentNullException(nameof(server)); - server.ClientConnected += async (_, connArgs) => + server.ClientConnected += async (_, args) => { try { using var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = source.Token; - var pipeName = $"{connArgs.Connection.PipeName}_Inferno"; - var infServer = new SingleConnectionPipeServer(pipeName, connArgs.Connection.Formatter); + var pipeName = $"{args.Connection.PipeName}_Inferno"; + var server = new SingleConnectionPipeServer(pipeName, args.Connection.Formatter); - infServer.ExceptionOccurred += (_, exArgs) => + server.ExceptionOccurred += (_, args) => { - Debug.WriteLine($"{nameof(EnableEncryption)} server returns exception: {exArgs.Exception}"); + Debug.WriteLine($"{nameof(EnableEncryption)} server returns exception: {args.Exception}"); - exceptionAction?.Invoke(exArgs.Exception); + exceptionAction?.Invoke(args.Exception); }; - await using (infServer.ConfigureAwait(false)) + await using (server.ConfigureAwait(false)) { - await infServer.StartAsync(cancellationToken).ConfigureAwait(false); + await server.StartAsync(cancellationToken).ConfigureAwait(false); - var response = await infServer.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + var response = await server.WaitMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); var clientPublicKey = response.Message; using var keyPair = new KeyPair(); - connArgs.Connection.Formatter = new InfernoFormatter( - connArgs.Connection.Formatter, + args.Connection.Formatter = new InfernoFormatter( + args.Connection.Formatter, keyPair.GenerateSharedKey(clientPublicKey)); - await infServer.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); + await server.WriteAsync(keyPair.PublicKey, cancellationToken).ConfigureAwait(false); } } catch (Exception exception) { Debug.WriteLine($"{nameof(EnableEncryption)} returns exception: {exception}"); - await connArgs.Connection.StopAsync().ConfigureAwait(false); + await args.Connection.StopAsync().ConfigureAwait(false); exceptionAction?.Invoke(exception); } From e2ff0a2893ed20ab1b94b6883333d558ea24d587 Mon Sep 17 00:00:00 2001 From: Konstantin S <3002068+HavenDV@users.noreply.github.com> Date: Fri, 29 Apr 2022 15:47:29 +1000 Subject: [PATCH 4/5] docs: Fixed xml doc warning. --- src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs b/src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs index 36cd69f..6ca49b8 100644 --- a/src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs +++ b/src/libs/H.Pipes.AccessControl/PipeServerExtensions.cs @@ -11,7 +11,7 @@ public static class PipeServerExtensions { /// /// Sets 's for each that will be created by
- /// Overrides + /// Overrides ///
/// /// From e66360e506f1bb45be6445e4bffe2ac2e966108e Mon Sep 17 00:00:00 2001 From: Konstantin S <3002068+HavenDV@users.noreply.github.com> Date: Fri, 29 Apr 2022 15:56:18 +1000 Subject: [PATCH 5/5] refactor: Changed FormatterExtensions to be IFormatter extensions. --- src/libs/H.Pipes/Extensions/FormatterExtensions.cs | 8 ++++---- src/libs/H.Pipes/PipeConnection.cs | 4 ++-- src/tests/H.Pipes.Tests/BaseTests.cs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/libs/H.Pipes/Extensions/FormatterExtensions.cs b/src/libs/H.Pipes/Extensions/FormatterExtensions.cs index 9c8ae61..8e55b43 100644 --- a/src/libs/H.Pipes/Extensions/FormatterExtensions.cs +++ b/src/libs/H.Pipes/Extensions/FormatterExtensions.cs @@ -22,8 +22,8 @@ public static class FormatterExtensions /// /// Serialized object. public static async Task SerializeAsync( - this T value, - IFormatter formatter, + this IFormatter formatter, + T value, CancellationToken cancellationToken) { if (formatter == null) @@ -44,8 +44,8 @@ public static async Task SerializeAsync( /// /// System.Nullable<T>. public static async Task DeserializeAsync( - this byte[] bytes, - IFormatter formatter, + this IFormatter formatter, + byte[] bytes, CancellationToken cancellationToken) { if (formatter == null) diff --git a/src/libs/H.Pipes/PipeConnection.cs b/src/libs/H.Pipes/PipeConnection.cs index b0b4d4a..2c2105f 100644 --- a/src/libs/H.Pipes/PipeConnection.cs +++ b/src/libs/H.Pipes/PipeConnection.cs @@ -33,7 +33,7 @@ protected override async Task OnMessageReceived(byte[]? message, CancellationTok T? obj = default; if (message != null) - obj = await message.DeserializeAsync(Formatter, cancellationToken).ConfigureAwait(false); + obj = await Formatter.DeserializeAsync(message, cancellationToken).ConfigureAwait(false); MessageReceived?.Invoke(this, new ConnectionMessageEventArgs(this, obj)); } @@ -190,7 +190,7 @@ public async Task WriteAsync(T value, CancellationToken cancellationToken = d throw new InvalidOperationException("Client is not connected"); } - var bytes = await value.SerializeAsync(Formatter, cancellationToken).ConfigureAwait(false); + var bytes = await Formatter.SerializeAsync(value, cancellationToken).ConfigureAwait(false); await PipeStreamWrapper.WriteAsync(bytes, cancellationToken).ConfigureAwait(false); } diff --git a/src/tests/H.Pipes.Tests/BaseTests.cs b/src/tests/H.Pipes.Tests/BaseTests.cs index c531aa8..e3640ee 100644 --- a/src/tests/H.Pipes.Tests/BaseTests.cs +++ b/src/tests/H.Pipes.Tests/BaseTests.cs @@ -22,7 +22,7 @@ public static void SetupMessageReceived( T? value = default; if (args.Message != null) - value = await args.Message.DeserializeAsync(server.Formatter, cancellationToken); + value = await server.Formatter.DeserializeAsync(args.Message, cancellationToken); var actualHash = hashFunc?.Invoke(value); setActualHashFunc(actualHash);