diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index 624e015..82b6893 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -129,10 +129,13 @@ public class Watcher : Thread } } - // TODO: Unblock all `dequeue()`'s here - // TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager))) version(unittest) { writeln("Exited watcher loop"); } - // this.manager.stop(); + + // NOTE: This will also be run on normal user-initiated `stop()` + // ... but will just try shutdown an alreayd shutdown manager + // ... again and try shut our already-closed river stream + // Shutdown and unblock all `dequeue()` calls + this.manager.stop_FailedWatcher(); } /** @@ -312,18 +315,6 @@ unittest * for such a case */ unittest -{ - -} - -/** - * Setup a `Manager` and then block on a `dequeue()` - * but from another thread shutdown the `Manager`. - * - * This is to test the exception triggering mechanism - * for such a case - */ -unittest { writeln("<<<<< Test 4 start >>>>>"); @@ -430,4 +421,108 @@ unittest // Check condition assert(failingException !is null); assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN); +} + +/** + * Setup a `Manager` and then block on a `dequeue()` + * but from another thread shutdown the `Manager`. + * + * This is to test the exception triggering mechanism + * for such a case + */ +unittest +{ + writeln("<<<<< Test 3 start >>>>>"); + + Address serverAddress = parseAddress("::1", 0); + Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + server.bind(serverAddress); + server.listen(0); + + class ServerThread : Thread + { + this() + { + super(&worker); + } + + private void worker() + { + Socket clientSocket = server.accept(); + BClient bClient = new BClient(clientSocket); + + Thread.sleep(dur!("seconds")(7)); + writeln("Server start"); + + sleep(dur!("seconds")(15)); + + writeln("Server ending"); + + // Close the connection + bClient.close(); + } + } + + ServerThread serverThread = new ServerThread(); + serverThread.start(); + + Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + + writeln(server.localAddress); + + + Manager manager = new Manager(client); + + Queue sixtyNine = new Queue(69); + + manager.registerQueue(sixtyNine); + + + /* Connect our socket to the server */ + client.connect(server.localAddress); + + /* Start the manager and let it manage the socket */ + manager.start(); + + + // The failing exception + TristanableException failingException; + + class DequeueThread : Thread + { + private Queue testQueue; + + this(Queue testQueue) + { + super(&worker); + this.testQueue = testQueue; + } + + public void worker() + { + try + { + writeln("dequeuThread: Before dequeue()"); + this.testQueue.dequeue(); + writeln("dequeueThread: After dequeue() [should not get here]"); + } + catch(TristanableException e) + { + writeln("Got tristanable exception during dequeue(): "~e.toString()); + + // TODO: Fliup boolean is all cgood and assret it later + failingException = e; + } + } + } + + DequeueThread dequeueThread = new DequeueThread(sixtyNine); + dequeueThread.start(); + + // Wait for the dequeueing thread to stop + dequeueThread.join(); + + // Check condition + assert(failingException !is null); + assert(failingException.getError() == ErrorType.WATCHER_FAILED); } \ No newline at end of file