From 8d4359efab8e8b0dd4823ca12e9eeec2147f2887 Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Fri, 27 May 2011 11:56:45 +0100 Subject: [PATCH] Added remaining haXe examples for Guide Chapter 2 Added haXe classes for all Guide examples in Chapter 1 and Chapter 2. Also corrected typo in ZMQConnection remoting class. --- org/zeromq/guide/DuraPub.hx | 70 ++++++++++++++++++++ org/zeromq/guide/DuraPub2.hx | 72 +++++++++++++++++++++ org/zeromq/guide/DuraSub.hx | 64 +++++++++++++++++++ org/zeromq/guide/HelloWorldClient.hx | 1 + org/zeromq/guide/HelloWorldServer.hx | 1 + org/zeromq/guide/MTRelay.hx | 87 +++++++++++++++++++++++++ org/zeromq/guide/MTServer.hx | 1 + org/zeromq/guide/PSEnvPub.hx | 61 ++++++++++++++++++ org/zeromq/guide/PSEnvSub.hx | 56 ++++++++++++++++ org/zeromq/guide/RrBroker.hx | 96 ++++++++++++++++++++++++++++ org/zeromq/guide/RrClient.hx | 66 +++++++++++++++++++ org/zeromq/guide/RrServer.hx | 83 ++++++++++++++++++++++++ org/zeromq/guide/Run.hx | 52 ++++++++++++++- org/zeromq/guide/SyncPub.hx | 71 ++++++++++++++++++++ org/zeromq/guide/SyncSub.hx | 78 ++++++++++++++++++++++ org/zeromq/guide/TaskSink.hx | 2 + org/zeromq/guide/TaskSink2.hx | 86 +++++++++++++++++++++++++ org/zeromq/guide/TaskVent.hx | 2 + org/zeromq/guide/TaskWork.hx | 2 + org/zeromq/guide/TaskWork2.hx | 92 ++++++++++++++++++++++++++ org/zeromq/guide/WUClient.hx | 2 + org/zeromq/guide/WUProxy.hx | 86 +++++++++++++++++++++++++ org/zeromq/guide/WUServer.hx | 2 + org/zeromq/remoting/ZMQConnection.hx | 2 +- 24 files changed, 1133 insertions(+), 2 deletions(-) create mode 100644 org/zeromq/guide/DuraPub.hx create mode 100644 org/zeromq/guide/DuraPub2.hx create mode 100644 org/zeromq/guide/DuraSub.hx create mode 100644 org/zeromq/guide/MTRelay.hx create mode 100644 org/zeromq/guide/PSEnvPub.hx create mode 100644 org/zeromq/guide/PSEnvSub.hx create mode 100644 org/zeromq/guide/RrBroker.hx create mode 100644 org/zeromq/guide/RrClient.hx create mode 100644 org/zeromq/guide/RrServer.hx create mode 100644 org/zeromq/guide/SyncPub.hx create mode 100644 org/zeromq/guide/SyncSub.hx create mode 100644 org/zeromq/guide/TaskSink2.hx create mode 100644 org/zeromq/guide/TaskWork2.hx create mode 100644 org/zeromq/guide/WUProxy.hx diff --git a/org/zeromq/guide/DuraPub.hx b/org/zeromq/guide/DuraPub.hx new file mode 100644 index 0000000..ae17fad --- /dev/null +++ b/org/zeromq/guide/DuraPub.hx @@ -0,0 +1,70 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import neko.Lib; +import haxe.io.Bytes; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Publisher for durable subscriber + * + * See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks + * + * Use with DuraSub.hx + */ +class DuraPub +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** DuraPub (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)"); + + // Subscriber tells us when it is ready here + var sync:ZMQSocket = context.socket(ZMQ_PULL); + sync.bind("tcp://*:5564"); + + // We send updates via this socket + var publisher:ZMQSocket = context.socket(ZMQ_PUB); + + // Uncomment next line to see effect of adding a high water mark to the publisher + // publisher.setsockopt(ZMQ_HWM, { hi:0, lo: 2 } ); // Set HWM to 2 + + publisher.bind("tcp://*:5565"); + + // Wait for synchronisation request + sync.recvMsg(); + + for (update_nbr in 0 ... 10) { + var str = "Update " + update_nbr; + Lib.println(str); + publisher.sendMsg(Bytes.ofString(str)); + Sys.sleep(1.0); + } + publisher.sendMsg(Bytes.ofString("END")); + + sync.close(); + publisher.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/DuraPub2.hx b/org/zeromq/guide/DuraPub2.hx new file mode 100644 index 0000000..23137c3 --- /dev/null +++ b/org/zeromq/guide/DuraPub2.hx @@ -0,0 +1,72 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import neko.Lib; +import haxe.io.Bytes; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Publisher for durable but cynical subscriber + * + * See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks + * + * Use with DuraSub.hx + */ +class DuraPub2 +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** DuraPub2 (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)"); + + // Subscriber tells us when it is ready here + var sync:ZMQSocket = context.socket(ZMQ_PULL); + sync.bind("tcp://*:5564"); + + // We send updates via this socket + var publisher:ZMQSocket = context.socket(ZMQ_PUB); + publisher.bind("tcp://*:5565"); + + // Prevent publisher overflowing because of slow subscribers + publisher.setsockopt(ZMQ_HWM, { hi:0, lo: 1 } ); // Set HWM to 1 + + // Specify swap space in bytes, this covers all subscribers + publisher.setsockopt(ZMQ_SWAP, { hi:0, lo: 25000000 } ); + + // Wait for synchronisation request + sync.recvMsg(); + + for (update_nbr in 0 ... 10) { + var str = "Update " + update_nbr; + Lib.println(str); + publisher.sendMsg(Bytes.ofString(str)); + Sys.sleep(1.0); + } + publisher.sendMsg(Bytes.ofString("END")); + + sync.close(); + publisher.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/DuraSub.hx b/org/zeromq/guide/DuraSub.hx new file mode 100644 index 0000000..619bca9 --- /dev/null +++ b/org/zeromq/guide/DuraSub.hx @@ -0,0 +1,64 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import haxe.io.Bytes; +import neko.Lib; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Durable subscriber + * + * See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks + * + * Use with DuraPub.hx and DuraPub2.hx + */ +class DuraSub +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** DuraSub (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)"); + + var subscriber:ZMQSocket = context.socket(ZMQ_SUB); + subscriber.setsockopt(ZMQ_IDENTITY, Bytes.ofString("Hello")); + subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + subscriber.connect("tcp://127.0.0.1:5565"); + + // Synschronise with publisher + var sync:ZMQSocket = context.socket(ZMQ_PUSH); + sync.connect("tcp://127.0.0.1:5564"); + sync.sendMsg(Bytes.ofString("")); + + // Get updates, exit when told to do so + while (true) { + var msgString:String = subscriber.recvMsg().toString(); + Lib.println(msgString + "\n"); + if (msgString == "END") { + break; + } + } + sync.close(); + subscriber.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/HelloWorldClient.hx b/org/zeromq/guide/HelloWorldClient.hx index 94f5ecf..f0f79f5 100644 --- a/org/zeromq/guide/HelloWorldClient.hx +++ b/org/zeromq/guide/HelloWorldClient.hx @@ -28,6 +28,7 @@ import org.zeromq.ZMQSocket; /** * Hello World client in Haxe. + * Use with HelloWorldServer.hx and MTServer.hx */ class HelloWorldClient { diff --git a/org/zeromq/guide/HelloWorldServer.hx b/org/zeromq/guide/HelloWorldServer.hx index 7c076db..ff78243 100644 --- a/org/zeromq/guide/HelloWorldServer.hx +++ b/org/zeromq/guide/HelloWorldServer.hx @@ -30,6 +30,7 @@ import org.zeromq.ZMQSocket; * Hello World server in Haxe * Binds REP to tcp://*:5556 * Expects "Hello" from client, replies with "World" + * Use with HelloWorldClient.hx * */ class HelloWorldServer diff --git a/org/zeromq/guide/MTRelay.hx b/org/zeromq/guide/MTRelay.hx new file mode 100644 index 0000000..9bdba1b --- /dev/null +++ b/org/zeromq/guide/MTRelay.hx @@ -0,0 +1,87 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import haxe.io.Bytes; +import neko.vm.Thread; +import neko.Lib; + +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Multi-threaded relay in haXe + * + */ +class MTRelay +{ + + static function step1() { + var context:ZMQContext = ZMQContext.instance(); + + // Connect to step2 and tell it we are ready + var xmitter:ZMQSocket = context.socket(ZMQ_PAIR); + xmitter.connect("inproc://step2"); + xmitter.sendMsg(Bytes.ofString("READY")); + xmitter.close(); + } + + static function step2() { + var context:ZMQContext = ZMQContext.instance(); + + // Bind inproc socket before starting step 1 + var receiver:ZMQSocket = context.socket(ZMQ_PAIR); + receiver.bind("inproc://step2"); + Thread.create(step1); + + // Wait for signal and pass it on + var msgBytes = receiver.recvMsg(); + receiver.close(); + + // Connect to step3 and tell it we are ready + var xmitter:ZMQSocket = context.socket(ZMQ_PAIR); + xmitter.connect("inproc://step3"); + xmitter.sendMsg(Bytes.ofString("READY")); + xmitter.close(); + } + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println ("** MTRelay (see: http://zguide.zeromq.org/page:all#Signaling-between-Threads)"); + + // This main thread represents Step 3 + + // Bind to inproc: endpoint then start upstream thread + var receiver:ZMQSocket = context.socket(ZMQ_PAIR); + receiver.bind("inproc://step3"); + + // Step2 relays the signal to step 3 + Thread.create(step2); + + // Wait for signal + var msgBytes = receiver.recvMsg(); + receiver.close(); + + trace ("Test successful!"); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/MTServer.hx b/org/zeromq/guide/MTServer.hx index 1ddba5b..cd23944 100644 --- a/org/zeromq/guide/MTServer.hx +++ b/org/zeromq/guide/MTServer.hx @@ -33,6 +33,7 @@ import org.zeromq.ZMQSocket; * Multithreaded Hello World Server * * See: http://zguide.zeromq.org/page:all#Multithreading-with-MQ + * Use with HelloWorldClient.hx */ class MTServer { diff --git a/org/zeromq/guide/PSEnvPub.hx b/org/zeromq/guide/PSEnvPub.hx new file mode 100644 index 0000000..27440d7 --- /dev/null +++ b/org/zeromq/guide/PSEnvPub.hx @@ -0,0 +1,61 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import haxe.io.Bytes; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQException; +import org.zeromq.ZMQSocket; + +/** + * Pubsub envelope publisher + * + * See: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes + * + * Use with PSEnvSub + */ +class PSEnvPub +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** PSEnvPub (see: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes)"); + + var publisher:ZMQSocket = context.socket(ZMQ_PUB); + publisher.bind("tcp://*:5563"); + + ZMQ.catchSignals(); + + + while (true) { + publisher.sendMsg(Bytes.ofString("A"), SNDMORE); + publisher.sendMsg(Bytes.ofString("We don't want to see this")); + publisher.sendMsg(Bytes.ofString("B"), SNDMORE); + publisher.sendMsg(Bytes.ofString("We would like to see this")); + Sys.sleep(1.0); + } + // We never get here but clean up anyhow + publisher.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/PSEnvSub.hx b/org/zeromq/guide/PSEnvSub.hx new file mode 100644 index 0000000..230c2bd --- /dev/null +++ b/org/zeromq/guide/PSEnvSub.hx @@ -0,0 +1,56 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import haxe.io.Bytes; +import neko.Lib; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Pubsub envelope subscriber + * + * See: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes + * + * Use with PSEnvPub + */ +class PSEnvSub +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** PSEnvSub (see: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes)"); + + var subscriber:ZMQSocket = context.socket(ZMQ_SUB); + subscriber.connect("tcp://127.0.0.1:5563"); + subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("B")); + + while (true) { + var msgAddress:Bytes = subscriber.recvMsg(); + // Read message contents + var msgContent:Bytes = subscriber.recvMsg(); + trace (msgAddress.toString() + " " + msgContent.toString() + "\n"); + } + // We never get here but clean up anyway + subscriber.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/RrBroker.hx b/org/zeromq/guide/RrBroker.hx new file mode 100644 index 0000000..6f0e45d --- /dev/null +++ b/org/zeromq/guide/RrBroker.hx @@ -0,0 +1,96 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import haxe.io.Bytes; +import haxe.Stack; +import neko.Lib; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMQSocket; + +/** + * Simple request-reply broker + * + * Use with RrClient.hx and RrServer.hx + */ +class RrBroker +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** RrBroker (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)"); + + var frontend:ZMQSocket = context.socket(ZMQ_ROUTER); + var backend:ZMQSocket = context.socket(ZMQ_DEALER); + frontend.bind("tcp://*:5559"); + backend.bind("tcp://*:5560"); + + Lib.println("Launch and connect broker."); + + // Initialise poll set + var items:ZMQPoller = context.poller(); + items.registerSocket(frontend, ZMQ.ZMQ_POLLIN()); + items.registerSocket(backend, ZMQ.ZMQ_POLLIN()); + + var more = false; + var msgBytes:Bytes; + + ZMQ.catchSignals(); + + while (true) { + try { + items.poll(); + if (items.pollin(1)) { + while (true) { + // receive message + msgBytes = frontend.recvMsg(); + more = frontend.hasReceiveMore(); + // broker it to backend + backend.sendMsg(msgBytes, { if (more) SNDMORE else null; } ); + if (!more) break; + } + } + + if (items.pollin(2)) { + while (true) { + // receive message + msgBytes = backend.recvMsg(); + more = backend.hasReceiveMore(); + // broker it to frontend + frontend.sendMsg(msgBytes, { if (more) SNDMORE else null; } ); + if (!more) break; + } + } + } catch (e:ZMQException) { + if (ZMQ.isInterrupted()) { + break; + } + // Handle other errors + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } + } + frontend.close(); + backend.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/RrClient.hx b/org/zeromq/guide/RrClient.hx new file mode 100644 index 0000000..a5e68c3 --- /dev/null +++ b/org/zeromq/guide/RrClient.hx @@ -0,0 +1,66 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import neko.Lib; +import haxe.io.Bytes; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; + +/** + * Hello World Client + * Connects REQ socket to tcp://localhost:5559 + * Sends "Hello" to server, expects "World" back + * + * See: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker + * + * Use with RrServer and RrBroker + */ +class RrClient +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** RrClient (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)"); + + var requester:ZMQSocket = context.socket(ZMQ_REQ); + requester.connect ("tcp://localhost:5559"); + + Lib.println ("Launch and connect client."); + + // Do 10 requests, waiting each time for a response + for (i in 0...10) { + var requestString = "Hello "; + // Send the message + requester.sendMsg(Bytes.ofString(requestString)); + + // Wait for the reply + var msg:Bytes = requester.recvMsg(); + + Lib.println("Received reply " + i + ": [" + msg.toString() + "]"); + + } + + // Shut down socket and context + requester.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/RrServer.hx b/org/zeromq/guide/RrServer.hx new file mode 100644 index 0000000..ce7f698 --- /dev/null +++ b/org/zeromq/guide/RrServer.hx @@ -0,0 +1,83 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import haxe.io.Bytes; +import haxe.Stack; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQException; +import org.zeromq.ZMQSocket; + +/** + * Hello World server in Haxe + * Binds REP to tcp://*:5560 + * Expects "Hello" from client, replies with "World" + * Use with RrClient.hx and RrBroker.hx + * + */ +class RrServer +{ + + public static function main() { + + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** RrServer (see: http://zguide.zeromq.org/page:all#A-Request-Reply-Broker)"); + + // Socket to talk to clients + var responder:ZMQSocket = context.socket(ZMQ_REP); + responder.connect("tcp://localhost:5560"); + + Lib.println("Launch and connect server."); + + ZMQ.catchSignals(); + + while (true) { + + try { + // Wait for next request from client + var request:Bytes = responder.recvMsg(); + + trace ("Received request:" + request.toString()); + + // Do some work + Sys.sleep(1); + + // Send reply back to client + responder.sendMsg(Bytes.ofString("World")); + } catch (e:ZMQException) { + if (ZMQ.isInterrupted()) { + break; + } + // Handle other errors + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } + + } + responder.close(); + context.term(); + + } + +} \ No newline at end of file diff --git a/org/zeromq/guide/Run.hx b/org/zeromq/guide/Run.hx index ba625f5..6c94f76 100644 --- a/org/zeromq/guide/Run.hx +++ b/org/zeromq/guide/Run.hx @@ -21,6 +21,7 @@ package org.zeromq.guide; import neko.io.File; import neko.io.FileInput; +import neko.io.Process; import neko.Lib; import neko.Sys; @@ -63,7 +64,28 @@ class Run Lib.println(""); Lib.println("11. Interrupt (** Doesn't work on Windows!)"); Lib.println(""); - Lib.println("12. MTServer"); + Lib.println("12. MTServer (use with 1. HelloWorldClient)"); + Lib.println(""); + Lib.println("13. TaskWork2 (use with 5. TaskVent and 14. TaskSink2)"); + Lib.println("14. TaskSink2 (use with 5. TaskVent and 13. TaskWork2)"); + Lib.println(""); + Lib.println("15. WUProxy (use with 4. WUServer)"); + Lib.println(""); + Lib.println("16. RrClient"); + Lib.println("17. RrBroker"); + Lib.println("18. RrServer"); + Lib.println(""); + Lib.println("19. MTRelay"); + Lib.println(""); + Lib.println("20. SyncPub"); + Lib.println("21. SyncSub"); + Lib.println(""); + Lib.println("22. PSEnvPub"); + Lib.println("23. PSEnvSub"); + Lib.println(""); + Lib.println("24. DuraPub"); + Lib.println("25. DuraSub"); + Lib.println("26. DuraPub2"); do { Lib.print("Type number followed by Enter key, or q to quit: "); @@ -97,6 +119,34 @@ class Run Interrupt.main(); case 12: MTServer.main(); + case 13: + TaskWork2.main(); + case 14: + TaskSink2.main(); + case 15: + WUProxy.main(); + case 16: + RrClient.main(); + case 17: + RrBroker.main(); + case 18: + RrServer.main(); + case 19: + MTRelay.main(); + case 20: + SyncPub.main(); + case 21: + SyncSub.main(); + case 22: + PSEnvPub.main(); + case 23: + PSEnvSub.main(); + case 24: + DuraPub.main(); + case 25: + DuraSub.main(); + case 26: + DuraPub2.main(); default: Lib.println ("Unknown program number ... exiting"); } diff --git a/org/zeromq/guide/SyncPub.hx b/org/zeromq/guide/SyncPub.hx new file mode 100644 index 0000000..9c82e8f --- /dev/null +++ b/org/zeromq/guide/SyncPub.hx @@ -0,0 +1,71 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; +import haxe.io.Bytes; +import neko.Lib; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Synchronised publisher + * + * See: http://zguide.zeromq.org/page:all#Node-Coordination + * + * Use with SyncSub.hx + */ +class SyncPub +{ + static inline var SUBSCRIBERS_EXPECTED = 10; + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + Lib.println("** SyncPub (see: http://zguide.zeromq.org/page:all#Node-Coordination)"); + + // Socket to talk to clients + var publisher:ZMQSocket = context.socket(ZMQ_PUB); + publisher.bind("tcp://*:5561"); + + // Socket to receive signals + var syncService:ZMQSocket = context.socket(ZMQ_REP); + syncService.bind("tcp://*:5562"); + + // get synchronisation from subscribers + var subscribers = 0; + while (subscribers < SUBSCRIBERS_EXPECTED) { + // wait for synchronisation request + var msgBytes = syncService.recvMsg(); + + // send synchronisation reply + syncService.sendMsg(Bytes.ofString("")); + subscribers++; + } + + // Now broadcast exactly 1m updates followed by END + for (update_nbr in 0 ... 1000000) { + publisher.sendMsg(Bytes.ofString("Rhubarb")); + } + publisher.sendMsg(Bytes.ofString("END")); + + publisher.close(); + syncService.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/SyncSub.hx b/org/zeromq/guide/SyncSub.hx new file mode 100644 index 0000000..e36b641 --- /dev/null +++ b/org/zeromq/guide/SyncSub.hx @@ -0,0 +1,78 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import neko.Lib; +import haxe.io.Bytes; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Synchronised subscriber + * + * See: http://zguide.zeromq.org/page:all#Node-Coordination + * + * Use with SyncPub.hx + */ +class SyncSub +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** SyncSub (see: http://zguide.zeromq.org/page:all#Node-Coordination)"); + + // First connect our subscriber socket + var subscriber:ZMQSocket = context.socket(ZMQ_SUB); + subscriber.connect("tcp://127.0.0.1:5561"); + subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + + // 0MQ is so fast, we need to wait a little while + Sys.sleep(1.0); + + // Second, synchronise with publisher + var syncClient:ZMQSocket = context.socket(ZMQ_REQ); + syncClient.connect("tcp://127.0.0.1:5562"); + + // Send a synchronisation request + syncClient.sendMsg(Bytes.ofString("")); + + // Wait for a synchronisation reply + var msgBytes:Bytes = syncClient.recvMsg(); + + // Third, get our updates and report how many we got + var update_nbr = 0; + while (true) { + msgBytes = subscriber.recvMsg(); + if (msgBytes.toString() == "END") { + break; + } + msgBytes = null; + update_nbr++; + } + Lib.println("Received " + update_nbr + " updates\n"); + + subscriber.close(); + syncClient.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/TaskSink.hx b/org/zeromq/guide/TaskSink.hx index 273ab43..0426240 100644 --- a/org/zeromq/guide/TaskSink.hx +++ b/org/zeromq/guide/TaskSink.hx @@ -34,6 +34,8 @@ import org.zeromq.ZMQSocket; * See: http://zguide.zeromq.org/page:all#Divide-and-Conquer * * Based on http://zguide.zeromq.org/java:tasksink + * + * Use with TaskVent.hx and TaskWork.hx */ class TaskSink { diff --git a/org/zeromq/guide/TaskSink2.hx b/org/zeromq/guide/TaskSink2.hx new file mode 100644 index 0000000..4d61d56 --- /dev/null +++ b/org/zeromq/guide/TaskSink2.hx @@ -0,0 +1,86 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import haxe.io.Bytes; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Parallel Task sink with kil signalling in Haxe + * Binds PULL request socket to tcp://localhost:5558 + * Collects results from workers via this socket + * + * See: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM + * + * Based on http://zguide.zeromq.org/cs:tasksink2 + * + * Use with TaskVent.hx and TaskWork2.hx + */ +class TaskSink2 +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** TaskSink2 (see: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM)"); + + // Socket to receive messages on + var receiver:ZMQSocket = context.socket(ZMQ_PULL); + receiver.bind("tcp://127.0.0.1:5558"); + + // Socket to send control messages to workers + var controller:ZMQSocket = context.socket(ZMQ_PUB); + controller.bind("tcp://127.0.0.1:5559"); + + // Wait for start of batch + var msgString = StringTools.trim(receiver.recvMsg().toString()); + + // Start our clock now + var tStart = Sys.time(); + + // Process 100 messages + var task_nbr:Int; + for (task_nbr in 0 ... 100) { + msgString = StringTools.trim(receiver.recvMsg().toString()); + if (task_nbr % 10 == 0) { + Lib.println(":"); // Print a ":" every 10 messages + } else { + Lib.print("."); + } + } + + // Calculate and report duation of batch + var tEnd = Sys.time(); + Lib.println("Total elapsed time: " + Math.ceil((tEnd - tStart) * 1000) + " msec"); + + // Send kill signal to workers + controller.sendMsg(Bytes.ofString("KILL")); + Sys.sleep(1.0); // Give 0MQ time to deliver + + // Shut down + receiver.close(); + controller.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/TaskVent.hx b/org/zeromq/guide/TaskVent.hx index 9eb074a..c2e039e 100644 --- a/org/zeromq/guide/TaskVent.hx +++ b/org/zeromq/guide/TaskVent.hx @@ -36,6 +36,8 @@ import org.zeromq.ZMQSocket; * Sends batch of tasks to workers via that socket. * * Based on code from: http://zguide.zeromq.org/java:taskvent + * + * Use with TaskWork.hx and TaskSink.hx */ class TaskVent { diff --git a/org/zeromq/guide/TaskWork.hx b/org/zeromq/guide/TaskWork.hx index 880f06d..113918e 100644 --- a/org/zeromq/guide/TaskWork.hx +++ b/org/zeromq/guide/TaskWork.hx @@ -36,6 +36,8 @@ import org.zeromq.ZMQSocket; * See: http://zguide.zeromq.org/page:all#Divide-and-Conquer * * Based on code from: http://zguide.zeromq.org/java:taskwork + * + * Use with TaskVent.hx and TaskSink.hx */ class TaskWork { diff --git a/org/zeromq/guide/TaskWork2.hx b/org/zeromq/guide/TaskWork2.hx new file mode 100644 index 0000000..c0044a0 --- /dev/null +++ b/org/zeromq/guide/TaskWork2.hx @@ -0,0 +1,92 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import haxe.io.Bytes; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMQSocket; + +/** + * Parallel Task worker with kill signalling in Haxe + * Connects PULL socket to tcp://localhost:5557 + * Collects workloads from ventilator via that socket + * Connects PUSH socket to tcp://localhost:5558 + * Sends results to sink via that socket + * + * See: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM + * + * Based on code from: http://zguide.zeromq.org/java:taskwork2 + */ +class TaskWork2 +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + + Lib.println("** TaskWork2 (see: http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM)"); + + // Socket to receive messages on + var receiver:ZMQSocket = context.socket(ZMQ_PULL); + receiver.connect("tcp://127.0.0.1:5557"); + + // Socket to send messages to + var sender:ZMQSocket = context.socket(ZMQ_PUSH); + sender.connect("tcp://127.0.0.1:5558"); + + // Socket to receive controller messages from + var controller:ZMQSocket = context.socket(ZMQ_SUB); + controller.connect("tcp://127.0.0.1:5559"); + controller.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + + var items:ZMQPoller = context.poller(); + items.registerSocket(receiver, ZMQ.ZMQ_POLLIN()); + items.registerSocket(controller, ZMQ.ZMQ_POLLIN()); + + var msgString:String; + + // Process tasks forever + while (true) { + var numSocks = items.poll(); + if (items.pollin(1)) { + // receiver socket has events + msgString = StringTools.trim(receiver.recvMsg().toString()); + var sec:Float = Std.parseFloat(msgString) / 1000.0; + Lib.print(msgString + "."); + + // Do the work + Sys.sleep(sec); + + // Send results to sink + sender.sendMsg(Bytes.ofString("")); + } + if (items.pollin(2)) { + break; // Exit loop + } + } + receiver.close(); + sender.close(); + controller.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/WUClient.hx b/org/zeromq/guide/WUClient.hx index 5130ea7..393f226 100644 --- a/org/zeromq/guide/WUClient.hx +++ b/org/zeromq/guide/WUClient.hx @@ -34,6 +34,8 @@ import org.zeromq.ZMQSocket; * Use optional argument to specify zip code (in range 1 to 100000) * * See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out + * + * Use with WUServer.hx */ class WUClient { diff --git a/org/zeromq/guide/WUProxy.hx b/org/zeromq/guide/WUProxy.hx new file mode 100644 index 0000000..f70642d --- /dev/null +++ b/org/zeromq/guide/WUProxy.hx @@ -0,0 +1,86 @@ +/** + * (c) 2011 Richard J Smith + * + * This file is part of hxzmq + * + * hxzmq is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * hxzmq is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package org.zeromq.guide; + +import haxe.io.Bytes; +import haxe.Stack; + +import neko.Lib; + +import org.zeromq.ZMQ; +import org.zeromq.ZMQContext; +import org.zeromq.ZMQSocket; + +/** + * Weather proxy device. + * + * See: http://zguide.zeromq.org/page:all#A-Publish-Subscribe-Proxy-Server + * + * Use with WUClient and WUServer + */ +class WUProxy +{ + + public static function main() { + var context:ZMQContext = ZMQContext.instance(); + Lib.println("** WUProxy (see: http://zguide.zeromq.org/page:all#A-Publish-Subscribe-Proxy-Server)"); + + // This is where the weather service sits + var frontend:ZMQSocket = context.socket(ZMQ_SUB); + frontend.connect("tcp://localhost:5556"); + + // This is our public endpoint for subscribers + var backend:ZMQSocket = context.socket(ZMQ_PUB); + backend.bind("tcp://10.1.1.0:8100"); + + // Subscribe on everything + frontend.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + + var more = false; + var msgBytes:Bytes; + + ZMQ.catchSignals(); + + var stopped = false; + while (!stopped) { + try { + msgBytes = frontend.recvMsg(); + more = frontend.hasReceiveMore(); + + // proxy it + backend.sendMsg(msgBytes, { if (more) SNDMORE else null; } ); + if (!more) { + stopped = true; + } + } catch (e:ZMQException) { + if (ZMQ.isInterrupted()) { + stopped = true; + } else { + // Handle other errors + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } + } + } + frontend.close(); + backend.close(); + context.term(); + } +} \ No newline at end of file diff --git a/org/zeromq/guide/WUServer.hx b/org/zeromq/guide/WUServer.hx index a668124..b669284 100644 --- a/org/zeromq/guide/WUServer.hx +++ b/org/zeromq/guide/WUServer.hx @@ -31,6 +31,8 @@ import org.zeromq.ZMQSocket; * Publishes random weather updates * * See: http://zguide.zeromq.org/page:all#Getting-the-Message-Out + * + * Use with WUClient.hx */ class WUServer { diff --git a/org/zeromq/remoting/ZMQConnection.hx b/org/zeromq/remoting/ZMQConnection.hx index 6b42e04..ea0d2b0 100644 --- a/org/zeromq/remoting/ZMQConnection.hx +++ b/org/zeromq/remoting/ZMQConnection.hx @@ -43,7 +43,7 @@ package org.zeromq.remoting; -import org.zeromq.remoting.SocketProtocol; +import org.zeromq.remoting.ZMQSocketProtocol; import haxe.remoting.AsyncConnection; import haxe.remoting.Context;