Skip to content

Commit

Permalink
Changes for ZeroMQ libzmq 3.1.0 release
Browse files Browse the repository at this point in the history
Changes made to the hxzmq wrapper classes, mostly in the ZMQ.hx class,
for the changes made in the ZeroMQ 3.1.0 release.

The changes are not backwardly compatible with the previous 2.1.x
libzmq version; this new version of hxzmq will only work alongside a
3.1.x version libzmq native library.

See http://www.zeromq.org/docs:3-1-upgrade for details of the changes
made in the libzmq library, and therefore in hxzmq.
  • Loading branch information
rjsmith committed Mar 6, 2012
1 parent bc0324b commit 753f3e6
Show file tree
Hide file tree
Showing 23 changed files with 453 additions and 196 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ Key files and folders contained in this repository:

## Versions

The current release of hxzmq is 1.4.0, compatable with libzmq-2.1.4 or any later 2.1.x version (latest tested in 2.1.8). The latest released hxzmq package shall also be available in the [haxelib repository] [4], accessable via the [haxelib tool] [5] which is included in the standard haXe distribution.
The current release of hxzmq on the master branch is compatable with libzmq-3.1.0 or any later 3.1.x version (latest tested in 3.1.0). The latest released hxzmq package shall also be available in the [haxelib repository] [4], accessable via the [haxelib tool] [5] which is included in the standard haXe distribution.

This version of hxzmq has also been tested against [php-zmq v0.7.0] [13]
A version of hxzmq compatable with the libzmq 2.1.x version is archived on the 2.1.x branch of the hxzmq git repository.

## Building and Installation

Expand Down
6 changes: 3 additions & 3 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
<set name="LIB_DIR" value="-libpath:lib" if="windows"/>

<set name="DBG" value="d" if="debug"/>

<!-- Define all cpp files that need to be compiled into the hxzmq.ndll -->
<files id="files">

<!-- Set up include folder paths -->
<compilerflag value = "-Iinclude"/>

<!-- ** Only for Windows, replace with path to your libzmq installation include directory -->
<compilerflag value = "-IC:\zeromq\zeromq-2.1.8\include" if="windows"/>
<compilerflag value = "-IC:\zeromq\zeromq-3.1.0\include" if="windows"/>
<!-- ** Only for Windows, replace with path to your libzmq installation source directory -->
<!-- This include in needed to pick up libzmq's stdint.hpp include file -->
<compilerflag value = "-IC:\zeromq\zeromq-2.1.8\src" if="windows"/>
<compilerflag value = "-IC:\zeromq\zeromq-3.1.0\src" if="windows"/>

<!-- Special flags for compiling on MacOSX 64 bit. May need for linux 64bit too? -->
<compilerflag value = "-D_FILE_OFFSET_BITS=64" if="macos"/>
Expand Down
2 changes: 1 addition & 1 deletion haxelib.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
<tag v="neko"/>
<tag v="php"/>
<description>Haxe language binding for the ZeroMQ socket library</description>
<version name="1.4.0">Re-built ndll against v2.1.8 of the libzmq library</version>
<version name="1.5.0">Added experimental support for ZMQ v3.x.x</version>
</project>
Binary file modified ndll/Linux/hxzmq.ndll
Binary file not shown.
Binary file modified ndll/Mac64/hxzmq.ndll
Binary file not shown.
Binary file modified ndll/Windows/hxzmq.ndll
Binary file not shown.
2 changes: 1 addition & 1 deletion org/zeromq/ZLoop.hx
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class ZLoop
rebuildPollset();
}
try {
rc = poller.poll(ticklessTimer() * 1000);
rc = poller.poll(ticklessTimer() * ZMQ.ZMQ_POLL_MSEC());
} catch (e:ZMQException) {
#if !php
if (ZMQ.isInterrupted()) {
Expand Down
146 changes: 81 additions & 65 deletions org/zeromq/ZMQ.hx
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ enum SocketType {
ZMQ_SUB;
ZMQ_REQ;
ZMQ_REP;
ZMQ_ROUTER; // Replaces XREQ in 2.1.4
ZMQ_DEALER; // Replaces XREP in 2.1.4
ZMQ_ROUTER;
ZMQ_DEALER;
ZMQ_PULL;
ZMQ_PUSH;
ZMQ_XPUB;
ZMQ_XSUB;
}

/**
Expand Down Expand Up @@ -75,16 +77,20 @@ enum SendReceiveFlagType {
* See: http://api.zeromq.org/master:zmq-setsockopt
*/
enum SocketOptionsType {
ZMQ_HWM; // Set high water mark
ZMQ_SWAP; // Set disk offload size

ZMQ_SNDHWM; // Set send buffer high water mark
ZMQ_RCVHWM; // Set receive buffer high water mark
ZMQ_MAXMSGSIZE; // Limits the size of the inbound message
ZMQ_MULTICAST_HOPS; // Sets the time-to-live field in every multicast packet sent from this socket
ZMQ_RCVTIMEO; // Sets the timeout for receive operation on the socket (msecs)
ZMQ_SNDTIMEO; // Sets the timeout for send operation on the socket (msecs)
ZMQ_HWM; // Set high water mark (eumulated in ZMQ v3+)
ZMQ_AFFINITY; // Set I/O thread affinity
ZMQ_IDENTITY; // Set socket identity
ZMQ_SUBSCRIBE; // Establish message filter
ZMQ_UNSUBSCRIBE; // Remove message filter
ZMQ_RATE; // Set multicast data rate
ZMQ_RECOVERY_IVL; // Set multicast recovery interval
ZMQ_RECOVERY_IVL_MSEC; // Set multicast recovery interval in milliseconds
ZMQ_MCAST_LOOP; // Control multicast loop-back
ZMQ_SNDBUF; // Set kernel transmit buffer size
ZMQ_RCVBUF; // Set kernel receive buffer size
ZMQ_LINGER; // Set linger period for socket shutdown
Expand All @@ -95,12 +101,7 @@ enum SocketOptionsType {
ZMQ_FD; // Retrieve file descriptor associated with the socket
ZMQ_EVENTS; // Retrieve socket event state (bitmask use ZMQ_POLLIN and ZMQ_POLLOUT)
ZMQ_TYPE; // Retrieves type of socket
}

enum DeviceType {
ZMQ_QUEUE;
ZMQ_FORWARDER;
ZMQ_STREAMER;
}

/**
Expand All @@ -116,29 +117,32 @@ class ZMQ {
// Values for flags in ZMQSocket's send and recv functions.
public static var bytesSocketOptionTypes:Array<SocketOptionsType> =
[
ZMQ_IDENTITY,
ZMQ_SUBSCRIBE,
ZMQ_UNSUBSCRIBE
ZMQ_UNSUBSCRIBE,
ZMQ_IDENTITY
];
public static var int64SocketOptionTypes:Array<SocketOptionsType> =
[
ZMQ_MAXMSGSIZE,
ZMQ_AFFINITY
];

public static var intSocketOptionTypes:Array<SocketOptionsType> =
[
ZMQ_SNDHWM,
ZMQ_RCVHWM,
ZMQ_MULTICAST_HOPS,
ZMQ_RCVTIMEO,
ZMQ_SNDTIMEO,
ZMQ_HWM,
ZMQ_SWAP,
ZMQ_AFFINITY,
ZMQ_RATE,
ZMQ_RECOVERY_IVL,
ZMQ_RECOVERY_IVL_MSEC,
ZMQ_MCAST_LOOP,
ZMQ_SNDBUF,
ZMQ_RCVBUF,
ZMQ_RCVMORE
];

public static var intSocketOptionTypes:Array<SocketOptionsType> =
[
ZMQ_LINGER,
ZMQ_RCVMORE,
ZMQ_RECONNECT_IVL,
ZMQ_RECONNECT_IVL_MAX,
ZMQ_LINGER,
ZMQ_BACKLOG,
ZMQ_FD, // Only int on POSIX systems
ZMQ_EVENTS,
Expand All @@ -159,6 +163,12 @@ class ZMQ {
return _hx_zmq_ZMQ_POLLERR();
}

// Multiplying factor for Poller.poll() method calls to handle
// units change in ZMQ3 from microsecs to millisecs
public static inline function ZMQ_POLL_MSEC():Int {
return _hx_zmq_ZMQ_POLL_MSEC();
}

/**
* Gets complete 0MQ library version
* @return 0MQ library version in form MMmmpp (MM=major, mm=minor, pp=patch)
Expand Down Expand Up @@ -248,6 +258,10 @@ class ZMQ {
_hx_zmq_ZMQ_PULL();
case ZMQ_PUSH:
_hx_zmq_ZMQ_PUSH();
case ZMQ_XPUB:
_hx_zmq_ZMQ_XPUB();
case ZMQ_XSUB:
_hx_zmq_ZMQ_XSUB();
default:
null;
}
Expand All @@ -262,12 +276,20 @@ class ZMQ {
public static function socketOptionTypeNo(option:SocketOptionsType):Int {
return {
switch(option) {
case ZMQ_SNDHWM:
_hx_zmq_ZMQ_SNDHWM();
case ZMQ_RCVHWM:
_hx_zmq_ZMQ_RCVHWM();
case ZMQ_MAXMSGSIZE:
_hx_zmq_ZMQ_MAXMSGSIZE();
case ZMQ_MULTICAST_HOPS:
_hx_zmq_ZMQ_MULTICAST_HOPS();
case ZMQ_SNDTIMEO:
_hx_zmq_ZMQ_SNDTIMEO();
case ZMQ_RCVTIMEO:
_hx_zmq_ZMQ_RCVTIMEO();
case ZMQ_LINGER:
_hx_zmq_ZMQ_LINGER();
case ZMQ_HWM:
_hx_zmq_ZMQ_HWM();
case ZMQ_SWAP:
_hx_zmq_ZMQ_SWAP();
case ZMQ_AFFINITY:
_hx_zmq_ZMQ_AFFINITY();
case ZMQ_IDENTITY:
Expand All @@ -280,10 +302,6 @@ class ZMQ {
_hx_zmq_ZMQ_RATE();
case ZMQ_RECOVERY_IVL:
_hx_zmq_ZMQ_RECOVERY_IVL();
case ZMQ_RECOVERY_IVL_MSEC:
_hx_zmq_ZMQ_RECOVERY_IVL_MSEC();
case ZMQ_MCAST_LOOP:
_hx_zmq_ZMQ_MCAST_LOOP();
case ZMQ_SNDBUF:
_hx_zmq_ZMQ_SNDBUF();
case ZMQ_RCVBUF:
Expand Down Expand Up @@ -419,27 +437,6 @@ class ZMQ {
}
}

/**
* Converts a ZMQ DeviceType enum value into underlying device number.
* Used for call to zmq_device
* @param device
* @return
*/
public static function deviceTypeToDevice(device: DeviceType):Int {
return {
switch (device) {
case ZMQ_QUEUE:
_hx_zmq_ZMQ_QUEUE();
case ZMQ_FORWARDER:
_hx_zmq_ZMQ_FORWARDER();
case ZMQ_STREAMER:
_hx_zmq_ZMQ_STREAMER();
default:
0;
}
}
}

/**
* Sets up interrupt signal handling.
* Use isInterrupted() to subsequently test for interruption
Expand Down Expand Up @@ -472,6 +469,10 @@ class ZMQ {
private static var _hx_zmq_catch_signals = Lib.load("hxzmq", "hx_zmq_catch_signals", 0);
private static var _hx_zmq_interrupted = Lib.load("hxzmq", "hx_zmq_interrupted", 0);

private static var _hx_zmq_ZMQ_POLL_MSEC = Lib.load("hxzmq", "hx_zmq_ZMQ_POLL_MSEC", 0);
#end

#if (neko || cpp)
private static var _hx_zmq_ZMQ_PUB = Lib.load("hxzmq", "hx_zmq_ZMQ_PUB", 0);
private static var _hx_zmq_ZMQ_SUB = Lib.load("hxzmq", "hx_zmq_ZMQ_SUB", 0);
private static var _hx_zmq_ZMQ_PAIR = Lib.load("hxzmq", "hx_zmq_ZMQ_PAIR", 0);
Expand All @@ -481,20 +482,29 @@ class ZMQ {
private static var _hx_zmq_ZMQ_ROUTER = Lib.load("hxzmq", "hx_zmq_ZMQ_ROUTER", 0);
private static var _hx_zmq_ZMQ_PULL = Lib.load("hxzmq", "hx_zmq_ZMQ_PULL", 0);
private static var _hx_zmq_ZMQ_PUSH = Lib.load("hxzmq", "hx_zmq_ZMQ_PUSH", 0);
private static var _hx_zmq_ZMQ_XREQ = Lib.load("hxzmq", "hx_zmq_ZMQ_XREQ", 0);
private static var _hx_zmq_ZMQ_XREP = Lib.load("hxzmq", "hx_zmq_ZMQ_XREP", 0);
private static var _hx_zmq_ZMQ_XPUB = Lib.load("hxzmq", "hx_zmq_ZMQ_XPUB", 0);
private static var _hx_zmq_ZMQ_XSUB = Lib.load("hxzmq", "hx_zmq_ZMQ_XSUB", 0);

private static var _hx_zmq_ZMQ_SNDHWM = Lib.load("hxzmq", "hx_zmq_ZMQ_SNDHWM", 0);
private static var _hx_zmq_ZMQ_RCVHWM = Lib.load("hxzmq", "hx_zmq_ZMQ_RCVHWM", 0);
private static var _hx_zmq_ZMQ_MAXMSGSIZE = Lib.load("hxzmq", "hx_zmq_ZMQ_MAXMSGSIZE", 0);
private static var _hx_zmq_ZMQ_MULTICAST_HOPS = Lib.load("hxzmq", "hx_zmq_ZMQ_MULTICAST_HOPS", 0);
private static var _hx_zmq_ZMQ_SNDTIMEO = Lib.load("hxzmq", "hx_zmq_ZMQ_SNDTIMEO", 0);
private static var _hx_zmq_ZMQ_RCVTIMEO = Lib.load("hxzmq", "hx_zmq_ZMQ_RCVTIMEO", 0);
#end


#if (neko || cpp)
private static var _hx_zmq_ZMQ_LINGER = Lib.load("hxzmq", "hx_zmq_ZMQ_LINGER", 0);
private static var _hx_zmq_ZMQ_HWM = Lib.load("hxzmq", "hx_zmq_ZMQ_HWM", 0);
private static var _hx_zmq_ZMQ_RCVMORE = Lib.load("hxzmq", "hx_zmq_ZMQ_RCVMORE", 0);
private static var _hx_zmq_ZMQ_SUBSCRIBE = Lib.load("hxzmq", "hx_zmq_ZMQ_SUBSCRIBE", 0);
private static var _hx_zmq_ZMQ_UNSUBSCRIBE = Lib.load("hxzmq", "hx_zmq_ZMQ_UNSUBSCRIBE", 0);
private static var _hx_zmq_ZMQ_SWAP = Lib.load("hxzmq", "hx_zmq_ZMQ_SWAP", 0);
private static var _hx_zmq_ZMQ_AFFINITY = Lib.load("hxzmq", "hx_zmq_ZMQ_AFFINITY", 0);
private static var _hx_zmq_ZMQ_IDENTITY = Lib.load("hxzmq", "hx_zmq_ZMQ_IDENTITY", 0);

private static var _hx_zmq_ZMQ_RATE = Lib.load("hxzmq", "hx_zmq_ZMQ_RATE", 0);
private static var _hx_zmq_ZMQ_RECOVERY_IVL = Lib.load("hxzmq", "hx_zmq_ZMQ_RECOVERY_IVL", 0);
private static var _hx_zmq_ZMQ_RECOVERY_IVL_MSEC = Lib.load("hxzmq", "hx_zmq_ZMQ_RECOVERY_IVL_MSEC", 0);
private static var _hx_zmq_ZMQ_MCAST_LOOP = Lib.load("hxzmq", "hx_zmq_ZMQ_MCAST_LOOP", 0);
private static var _hx_zmq_ZMQ_SNDBUF = Lib.load("hxzmq", "hx_zmq_ZMQ_SNDBUF", 0);
private static var _hx_zmq_ZMQ_RCVBUF = Lib.load("hxzmq", "hx_zmq_ZMQ_RCVBUF", 0);
private static var _hx_zmq_ZMQ_RECONNECT_IVL = Lib.load("hxzmq", "hx_zmq_ZMQ_RECONNECT_IVL", 0);
Expand All @@ -503,7 +513,7 @@ class ZMQ {
private static var _hx_zmq_ZMQ_FD = Lib.load("hxzmq", "hx_zmq_ZMQ_FD", 0);
private static var _hx_zmq_ZMQ_EVENTS = Lib.load("hxzmq", "hx_zmq_ZMQ_EVENTS", 0);
private static var _hx_zmq_ZMQ_TYPE = Lib.load("hxzmq", "hx_zmq_ZMQ_TYPE", 0);

private static var _hx_zmq_ZMQ_POLLIN = Lib.load("hxzmq", "hx_zmq_ZMQ_POLLIN", 0);
private static var _hx_zmq_ZMQ_POLLOUT = Lib.load("hxzmq", "hx_zmq_ZMQ_POLLOUT", 0);
private static var _hx_zmq_ZMQ_POLLERR = Lib.load("hxzmq", "hx_zmq_ZMQ_POLLERR", 0);
Expand All @@ -527,12 +537,10 @@ class ZMQ {
private static var _hx_zmq_EFSM = Lib.load("hxzmq", "hx_zmq_EFSM", 0);
private static var _hx_zmq_ENOCOMPATPROTO = Lib.load("hxzmq", "hx_zmq_ENOCOMPATPROTO", 0);
private static var _hx_zmq_ETERM = Lib.load("hxzmq", "hx_zmq_ETERM", 0);

private static var _hx_zmq_ZMQ_QUEUE = Lib.load("hxzmq", "hx_zmq_ZMQ_QUEUE", 0);
private static var _hx_zmq_ZMQ_FORWARDER = Lib.load("hxzmq", "hx_zmq_ZMQ_FORWARDER", 0);
private static var _hx_zmq_ZMQ_STREAMER = Lib.load("hxzmq", "hx_zmq_ZMQ_STREAMER", 0);

#elseif php
#end


#if php
// Load functions and constants from php-zmq


Expand Down Expand Up @@ -583,6 +591,14 @@ class ZMQ {
private static function _hx_zmq_ZMQ_PULL():Int {return untyped __php__('ZMQ::SOCKET_PULL');}
private static function _hx_zmq_ZMQ_PUSH():Int {return untyped __php__('ZMQ::SOCKET_PUSH');}

// TODO: Replace stubs with ZMQ3 PHP bindings
private static var _hx_zmq_ZMQ_SNDHWM():Int {return untyped __php__('ZMQ::SOCKET_PUB');};
private static var _hx_zmq_ZMQ_RCVHWM():Int {return untyped __php__('ZMQ::SOCKET_PUB');};
private static var _hx_zmq_ZMQ_MAXMSGSIZE():Int {return untyped __php__('ZMQ::SOCKET_PUB');};
private static var _hx_zmq_ZMQ_MULTICAST_HOPS():Int {return untyped __php__('ZMQ::SOCKET_PUB');};
private static var _hx_zmq_ZMQ_SNDTIMEO():Int {return untyped __php__('ZMQ::SOCKET_PUB');};
private static var _hx_zmq_ZMQ_RCVTIMEO():Int {return untyped __php__('ZMQ::SOCKET_PUB');};


private static function _hx_zmq_ZMQ_LINGER():Int {return untyped __php__('ZMQ::SOCKOPT_LINGER');}
private static function _hx_zmq_ZMQ_HWM():Int {return untyped __php__('ZMQ::SOCKOPT_HWM');}
Expand Down Expand Up @@ -655,6 +671,6 @@ class ZMQ {
private static inline function _hx_zmq_ZMQ_FORWARDER():Int { return untyped __php__('ZMQ::DEVICE_FORWARDER'); }
private static inline function _hx_zmq_ZMQ_STREAMER():Int { return untyped __php__('ZMQ::DEVICE_STREAMER'); }

#end
#end
}

47 changes: 6 additions & 41 deletions org/zeromq/ZMQDevice.hx
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,15 @@ import org.zeromq.ZMQ;

/**
* Wraps ZMQ zmq_device method call.
* Creates in-built ZMQ devices that run in the current thread of execution
* Creates in-built ZMQ devices that run in the current thread of execution.
*
* DEPRECATED in 3.1.x branch
*/

class ZMQDevice
{

/**
* Constructor.
* Creates a new ZMQ device and immediately starts its in-built loop.
* Will continue unless process is interrupted, when it returns a ETERM ZMQ_Exception.
* @param type A valid DeviceType
* @param frontend Front end socket, bound or connected to by clients
* @param backend Back end socket, bound or connected to workers
*/
public function new(type:DeviceType, frontend:ZMQSocket, backend:ZMQSocket)
{
if (frontend == null || frontend.closed) {
throw new ZMQException(EINVAL);
}
if (backend == null || backend.closed) {
throw new ZMQException(EINVAL);
}
if (type == null) {
throw new ZMQException(EINVAL);
}
#if (neko || cpp)
try {
// This will continue to execute until current thread or process is terminated
var rc = _hx_zmq_device(ZMQ.deviceTypeToDevice(type), frontend._socketHandle, backend._socketHandle);
} catch (e:Int) {
throw new ZMQException(ZMQ.errNoToErrorType(e));
} catch (e:Dynamic) {
Lib.rethrow(e);
}
#elseif php
var _typenum = ZMQ.deviceTypeToDevice(type);
var _frontend_handle = frontend._socketHandle;
var _backend_handle = backend._socketHandle;
var r = untyped __php__('new ZMQDevice($_typenum, $_frontend_handle, $_backend_handle)');
#end

public function new() {
throw new ZMQException(ENOTSUP);
}

#if (neko || cpp)
private static var _hx_zmq_device = Lib.load("hxzmq", "hx_zmq_device", 3);

#end
}
Loading

0 comments on commit 753f3e6

Please sign in to comment.