Skip to content

Commit

Permalink
Added remaining haXe examples for Guide Chapter 2
Browse files Browse the repository at this point in the history
Added haXe classes for all Guide examples in Chapter 1 and Chapter 2.
Also corrected typo in ZMQConnection remoting class.
  • Loading branch information
rjsmith committed May 27, 2011
1 parent 73491bd commit 8d4359e
Show file tree
Hide file tree
Showing 24 changed files with 1,133 additions and 2 deletions.
70 changes: 70 additions & 0 deletions org/zeromq/guide/DuraPub.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of hxzmq
*
* hxzmq is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* hxzmq is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq.guide;
import neko.Lib;
import haxe.io.Bytes;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Publisher for durable subscriber
*
* See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks
*
* Use with DuraSub.hx
*/
class DuraPub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** DuraPub (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)");

// Subscriber tells us when it is ready here
var sync:ZMQSocket = context.socket(ZMQ_PULL);
sync.bind("tcp://*:5564");

// We send updates via this socket
var publisher:ZMQSocket = context.socket(ZMQ_PUB);

// Uncomment next line to see effect of adding a high water mark to the publisher
// publisher.setsockopt(ZMQ_HWM, { hi:0, lo: 2 } ); // Set HWM to 2

publisher.bind("tcp://*:5565");

// Wait for synchronisation request
sync.recvMsg();

for (update_nbr in 0 ... 10) {
var str = "Update " + update_nbr;
Lib.println(str);
publisher.sendMsg(Bytes.ofString(str));
Sys.sleep(1.0);
}
publisher.sendMsg(Bytes.ofString("END"));

sync.close();
publisher.close();
context.term();
}
}
72 changes: 72 additions & 0 deletions org/zeromq/guide/DuraPub2.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of hxzmq
*
* hxzmq is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* hxzmq is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq.guide;
import neko.Lib;
import haxe.io.Bytes;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Publisher for durable but cynical subscriber
*
* See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks
*
* Use with DuraSub.hx
*/
class DuraPub2
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** DuraPub2 (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)");

// Subscriber tells us when it is ready here
var sync:ZMQSocket = context.socket(ZMQ_PULL);
sync.bind("tcp://*:5564");

// We send updates via this socket
var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://*:5565");

// Prevent publisher overflowing because of slow subscribers
publisher.setsockopt(ZMQ_HWM, { hi:0, lo: 1 } ); // Set HWM to 1

// Specify swap space in bytes, this covers all subscribers
publisher.setsockopt(ZMQ_SWAP, { hi:0, lo: 25000000 } );

// Wait for synchronisation request
sync.recvMsg();

for (update_nbr in 0 ... 10) {
var str = "Update " + update_nbr;
Lib.println(str);
publisher.sendMsg(Bytes.ofString(str));
Sys.sleep(1.0);
}
publisher.sendMsg(Bytes.ofString("END"));

sync.close();
publisher.close();
context.term();
}
}
64 changes: 64 additions & 0 deletions org/zeromq/guide/DuraSub.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of hxzmq
*
* hxzmq is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* hxzmq is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq.guide;
import haxe.io.Bytes;
import neko.Lib;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Durable subscriber
*
* See: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks
*
* Use with DuraPub.hx and DuraPub2.hx
*/
class DuraSub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** DuraSub (see: http://zguide.zeromq.org/page:all#-Semi-Durable-Subscribers-and-High-Water-Marks)");

var subscriber:ZMQSocket = context.socket(ZMQ_SUB);
subscriber.setsockopt(ZMQ_IDENTITY, Bytes.ofString("Hello"));
subscriber.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));
subscriber.connect("tcp://127.0.0.1:5565");

// Synschronise with publisher
var sync:ZMQSocket = context.socket(ZMQ_PUSH);
sync.connect("tcp://127.0.0.1:5564");
sync.sendMsg(Bytes.ofString(""));

// Get updates, exit when told to do so
while (true) {
var msgString:String = subscriber.recvMsg().toString();
Lib.println(msgString + "\n");
if (msgString == "END") {
break;
}
}
sync.close();
subscriber.close();
context.term();
}
}
1 change: 1 addition & 0 deletions org/zeromq/guide/HelloWorldClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.zeromq.ZMQSocket;

/**
* Hello World client in Haxe.
* Use with HelloWorldServer.hx and MTServer.hx
*/
class HelloWorldClient
{
Expand Down
1 change: 1 addition & 0 deletions org/zeromq/guide/HelloWorldServer.hx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.zeromq.ZMQSocket;
* Hello World server in Haxe
* Binds REP to tcp://*:5556
* Expects "Hello" from client, replies with "World"
* Use with HelloWorldClient.hx
*
*/
class HelloWorldServer
Expand Down
87 changes: 87 additions & 0 deletions org/zeromq/guide/MTRelay.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of hxzmq
*
* hxzmq is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* hxzmq is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq.guide;

import haxe.io.Bytes;
import neko.vm.Thread;
import neko.Lib;

import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQSocket;

/**
* Multi-threaded relay in haXe
*
*/
class MTRelay
{

static function step1() {
var context:ZMQContext = ZMQContext.instance();

// Connect to step2 and tell it we are ready
var xmitter:ZMQSocket = context.socket(ZMQ_PAIR);
xmitter.connect("inproc://step2");
xmitter.sendMsg(Bytes.ofString("READY"));
xmitter.close();
}

static function step2() {
var context:ZMQContext = ZMQContext.instance();

// Bind inproc socket before starting step 1
var receiver:ZMQSocket = context.socket(ZMQ_PAIR);
receiver.bind("inproc://step2");
Thread.create(step1);

// Wait for signal and pass it on
var msgBytes = receiver.recvMsg();
receiver.close();

// Connect to step3 and tell it we are ready
var xmitter:ZMQSocket = context.socket(ZMQ_PAIR);
xmitter.connect("inproc://step3");
xmitter.sendMsg(Bytes.ofString("READY"));
xmitter.close();
}

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println ("** MTRelay (see: http://zguide.zeromq.org/page:all#Signaling-between-Threads)");

// This main thread represents Step 3

// Bind to inproc: endpoint then start upstream thread
var receiver:ZMQSocket = context.socket(ZMQ_PAIR);
receiver.bind("inproc://step3");

// Step2 relays the signal to step 3
Thread.create(step2);

// Wait for signal
var msgBytes = receiver.recvMsg();
receiver.close();

trace ("Test successful!");
context.term();
}
}
1 change: 1 addition & 0 deletions org/zeromq/guide/MTServer.hx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.zeromq.ZMQSocket;
* Multithreaded Hello World Server
*
* See: http://zguide.zeromq.org/page:all#Multithreading-with-MQ
* Use with HelloWorldClient.hx
*/
class MTServer
{
Expand Down
61 changes: 61 additions & 0 deletions org/zeromq/guide/PSEnvPub.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of hxzmq
*
* hxzmq is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* hxzmq is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq.guide;
import haxe.io.Bytes;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;

/**
* Pubsub envelope publisher
*
* See: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes
*
* Use with PSEnvSub
*/
class PSEnvPub
{

public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println("** PSEnvPub (see: http://zguide.zeromq.org/page:all#Pub-sub-Message-Envelopes)");

var publisher:ZMQSocket = context.socket(ZMQ_PUB);
publisher.bind("tcp://*:5563");

ZMQ.catchSignals();


while (true) {
publisher.sendMsg(Bytes.ofString("A"), SNDMORE);
publisher.sendMsg(Bytes.ofString("We don't want to see this"));
publisher.sendMsg(Bytes.ofString("B"), SNDMORE);
publisher.sendMsg(Bytes.ofString("We would like to see this"));
Sys.sleep(1.0);
}
// We never get here but clean up anyhow
publisher.close();
context.term();
}
}
Loading

0 comments on commit 8d4359e

Please sign in to comment.