Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unblock queues on shutdown or errors #10

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions source/tristanable/exceptions.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ import std.conv : to;
*/
public enum ErrorType
{
/**
* Unset
*/
UNSET,

/**
* If the manager has already
* been shutdown
*/
MANAGER_SHUTDOWN,

/**
* If the watcher has failed
* to stay alive
*/
WATCHER_FAILED,

/**
* If the requested queue could not be found
*/
Expand Down
59 changes: 59 additions & 0 deletions source/tristanable/manager/manager.d
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,69 @@ public class Manager
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*
* Calling this will also unblock any calls that
* were blocking whilst doing a `dequeue()`
*/
public void stop()
{
/* Stop with the given reason */
stop(ErrorType.MANAGER_SHUTDOWN);
}

/**
* Only called by the `Watcher` and for
* the purpose of setting a custom error
* type.
*
* Called when the network read fails
*/
void stop_FailedWatcher()
{
/* Stop with the given reason */
stop(ErrorType.WATCHER_FAILED);
}

/**
* Stops the watcher service and then
* unblocks all calls to `dequeue()`
* by shutting down each `Queue`
*
* Params:
* reason = the reason for the
* shutdown
*/
private void stop(ErrorType reason)
{
/* Stop the watcher */
watcher.shutdown();

/* Unblock all `dequeue()` calls */
shutdownAllQueues(reason);
}

/**
* Shuts down all registered queues
*/
protected void shutdownAllQueues(ErrorType reason)
{
/* Lock the queue of queues */
queuesLock.lock();

/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}

// TODO: Shutdown default queue - see mtsafety

/* Shutdown each queue */
foreach(Queue queue; this.queues)
{
queue.shutdownQueue(reason);
}
}

/**
Expand Down
244 changes: 240 additions & 4 deletions source/tristanable/manager/watcher.d
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public class Watcher : Thread
break;
}
}

version(unittest) { writeln("Exited watcher loop"); }

// NOTE: This will also be run on normal user-initiated `stop()`
// ... but will just try shutdown an alreayd shutdown manager
// ... again and try shut our already-closed river stream
// Shutdown and unblock all `dequeue()` calls

// TODO: A problem is user-initiated could cause this to trugger first and then throw
// ... actually with a WATCHER_FAILED - we should maybe use one error
// ... or find a smart way to have the right flow go off - split up calls
// ... more?
this.manager.stop_FailedWatcher();
}

/**
Expand All @@ -141,17 +154,20 @@ public class Watcher : Thread
}
}

version(unittest)
{
import std.socket;
import std.stdio;
import core.thread;
}

/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
Expand Down Expand Up @@ -294,4 +310,224 @@ unittest

/* Stop the manager */
manager.stop();
}

/**
* Setup a `Manager` and then block on a `dequeue()`
* but from another thread shutdown the `Manager`.
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 3 start >>>>>");

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);

class ServerThread : Thread
{
this()
{
super(&worker);
}

private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);

Thread.sleep(dur!("seconds")(7));
writeln("Server start");

/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳️
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));

writeln("server send [done]");

sleep(dur!("seconds")(15));

writeln("Server ending");
}
}

ServerThread serverThread = new ServerThread();
serverThread.start();

Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);

writeln(server.localAddress);


Manager manager = new Manager(client);

Queue sixtyNine = new Queue(69);

manager.registerQueue(sixtyNine);


/* Connect our socket to the server */
client.connect(server.localAddress);

/* Start the manager and let it manage the socket */
manager.start();


// The failing exception
TristanableException failingException;

class DequeueThread : Thread
{
private Queue testQueue;

this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}

public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());

// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}

DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();

// Stop the manager
manager.stop();
writeln("drop");

// Wait for the dequeueing thread to stop
dequeueThread.join();

// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
}

/**
* Setup a server which dies (kills its connection to us)
* midway whilst we are doing a `dequeue()`
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 4 start >>>>>");

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);

class ServerThread : Thread
{
this()
{
super(&worker);
}

private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);

Thread.sleep(dur!("seconds")(7));
writeln("Server start");

sleep(dur!("seconds")(15));

writeln("Server ending");

// Close the connection
bClient.close();
}
}

ServerThread serverThread = new ServerThread();
serverThread.start();

Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);

writeln(server.localAddress);


Manager manager = new Manager(client);

Queue sixtyNine = new Queue(69);

manager.registerQueue(sixtyNine);


/* Connect our socket to the server */
client.connect(server.localAddress);

/* Start the manager and let it manage the socket */
manager.start();


// The failing exception
TristanableException failingException;

class DequeueThread : Thread
{
private Queue testQueue;

this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}

public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());

// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}

DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();

// Wait for the dequeueing thread to stop
dequeueThread.join();

// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.WATCHER_FAILED);
}
Loading
Loading