zeromq3-haskell-0.3.1: Bindings to ZeroMQ 3.x

Portabilitynon-portable
Stabilityexperimental
MaintainerToralf Wittner <tw@dtex.org>
Safe HaskellNone

System.ZMQ3

Contents

Description

0MQ haskell binding. The API closely follows the C-API of 0MQ with the main difference that sockets are typed. The documentation of the individual socket types is copied from 0MQ's man pages authored by Martin Sustrik. For details please refer to http://api.zeromq.org

Differences to zeromq-haskell 2.x

Socket Types

Socket Options

Instead of a single SocketOption data-type, getter and setter functions are provided, e.g. one would write: affinity sock instead of getOption sock (Affinity 0)

Restrictions

Many option setters use a Restriction to further constrain the range of possible values of their integral types. For example the maximum message size can be given as -1, which means no limit or by greater values, which denote the message size in bytes. The type of setMaxMessageSize is therefore:

setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()

which means any integral value in the range of -1 to (maxBound :: Int64) can be given. To create a restricted value from plain value, use toRestricted or restrict.

Devices

Devices are no longer present in 0MQ 3.x and consequently have been removed form this binding as well.

Error Handling

The type ZMQError is introduced, together with inspection functions errno, source and message. zmq_strerror is used underneath to retrieve the correct error message. ZMQError will be thrown when native 0MQ procedures return an error status and it can be catched as an Exception.

Synopsis

Type Definitions

type Size = Word

data Context

A 0MQ context representation.

data Socket a

A 0MQ Socket.

data Flag

Flags to apply on send operations (cf. man zmq_send)

Constructors

SendMore

ZMQ_SNDMORE

Instances

data Switch

Configuration switch

Constructors

Default

Use default setting

On

Activate setting

Off

De-activate setting

Instances

type Timeout = Int64

data Event

Socket events.

Constructors

In

ZMQ_POLLIN (incoming messages)

Out

ZMQ_POLLOUT (outgoing messages, i.e. at least 1 byte can be written)

Err

ZMQ_POLLERR

data Poll m where

Type representing a descriptor, poll is waiting for (either a 0MQ socket or a file descriptor) plus the type of event to wait for.

Constructors

Sock :: Socket s -> [Event] -> Maybe ([Event] -> m ()) -> Poll m 
File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll m 

Type Classes

class Subscriber a

Sockets which can subscribe.

Instances

Socket Types

data Pair

Socket to communicate with a single peer. Allows for only a single connect or a single bind. There's no message routing or message filtering involved. Compatible peer sockets: Pair.

Constructors

Pair 

data Pub

Socket to distribute data. receive function is not implemented for this socket type. Messages are distributed in fanout fashion to all the peers. Compatible peer sockets: Sub.

Constructors

Pub 

Instances

data Sub

Socket to subscribe for data. send function is not implemented for this socket type. Initially, socket is subscribed for no messages. Use subscribe to specify which messages to subscribe for. Compatible peer sockets: Pub.

Constructors

Sub 

data XPub

Same as Pub except that you can receive subscriptions from the peers in form of incoming messages. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Compatible peer sockets: Sub, XSub.

Constructors

XPub 

data XSub

Same as Sub except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Compatible peer sockets: Pub, XPub.

Constructors

XSub 

data Req

Socket to send requests and receive replies. Requests are load-balanced among all the peers. This socket type allows only an alternated sequence of send's and recv's. Compatible peer sockets: Rep, Router.

Constructors

Req 

data Rep

Socket to receive requests and send replies. This socket type allows only an alternated sequence of receive's and send's. Each send is routed to the peer that issued the last received request. Compatible peer sockets: Req, Dealer.

Constructors

Rep 

data Dealer

Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers. Compatible peer sockets: Router, Req, Rep.

Constructors

Dealer 

data Router

When receiving messages a Router socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages a Router socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to. If the peer does not exist anymore the message shall be silently discarded. Compatible peer sockets: Dealer, Req, Rep.

Constructors

Router 

type XReq = Dealer

Deprecated: Use Dealer

Deprecated Alias

type XRep = Router

Deprecated: Use Router

Deprecated Alias

data Pull

A socket of type Pull is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. The zmq_send() function is not implemented for this socket type.

Constructors

Pull 

data Push

A socket of type Push is used by a pipeline node to send messages to downstream pipeline nodes. Messages are load-balanced to all connected downstream nodes. The zmq_recv() function is not implemented for this socket type.

When a Push socket enters an exceptional state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any zmq_send(3) operations on the socket shall block until the exceptional state ends or at least one downstream node becomes available for sending; messages are not discarded.

Constructors

Push 

General Operations

withContext :: (Context -> IO a) -> IO a

Run an action with a 0MQ context. The Context supplied to your action will not be valid after the action either returns or throws an exception.

withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b

Run an action with a 0MQ socket. The socket will be closed after running the supplied action even if an error occurs. The socket supplied to your action will not be valid after the action terminates.

bind :: Socket a -> String -> IO ()

Bind the socket to the given address (cf. zmq_bind)

unbind :: Socket a -> String -> IO ()

Unbind the socket from the given address (cf. zmq_unbind)

connect :: Socket a -> String -> IO ()

Connect the socket to the given address (cf. zmq_connect).

send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()

Send the given ByteString over the socket (cf. zmq_sendmsg).

Note: This function always calls zmq_sendmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still send is blocking the thread as long as the message can not be queued on the socket using GHC's threadWaitWrite.

send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()

Send the given ByteString over the socket (cf. zmq_sendmsg). This is operationally identical to send socket (Strict.concat (Lazy.toChunks lbs)) flags but may be more efficient.

Note: This function always calls zmq_sendmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still send' is blocking the thread as long as the message can not be queued on the socket using GHC's threadWaitWrite.

sendMulti :: Sender a => Socket a -> [ByteString] -> IO ()

Send a multi-part message. This function applies the SendMore Flag between all message parts. 0MQ guarantees atomic delivery of a multi-part message (cf. zmq_sendmsg for details).

receive :: Receiver a => Socket a -> IO ByteString

Receive a ByteString from socket (cf. zmq_recvmsg).

Note: This function always calls zmq_recvmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still receive is blocking the thread as long as no data is available using GHC's threadWaitRead.

receiveMulti :: Receiver a => Socket a -> IO [ByteString]

Receive a multi-part message. This function collects all message parts send via sendMulti.

version :: IO (Int, Int, Int)

Return the runtime version of the underlying 0MQ library as a (major, minor, patch) triple.

monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))

Monitor socket events. This function returns a function which can be invoked to retrieve the next socket event, potentially blocking until the next one becomes available. When applied to False, monitoring will terminate, i.e. internal monitoring resources will be disposed. Consequently after monitor has been invoked, the returned function must be applied once to False.

poll :: MonadIO m => Timeout -> [Poll m] -> m [[Event]]

Polls for events on the given Poll descriptors. Returns the same list of Poll descriptors with an updated PollEvent field (cf. zmq_poll). Sockets which have seen no activity have None in their PollEvent field.

subscribe :: Subscriber a => Socket a -> ByteString -> IO ()

Subscribe Socket to given subscription.

unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()

Unsubscribe Socket from given subscription.

Context Options (Read)

ioThreads :: Context -> IO Word

Cf. zmq_ctx_get ZMQ_IO_THREADS

maxSockets :: Context -> IO Word

Cf. zmq_ctx_get ZMQ_MAX_SOCKETS

Context Options (Write)

setIoThreads :: Word -> Context -> IO ()

Cf. zmq_ctx_set ZMQ_IO_THREADS

setMaxSockets :: Word -> Context -> IO ()

Cf. zmq_ctx_set ZMQ_MAX_SOCKETS

Socket Options (Read)

affinity :: Socket a -> IO Word64

Cf. zmq_getsockopt ZMQ_AFFINITY

backlog :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_BACKLOG

delayAttachOnConnect :: Socket a -> IO Bool

Cf. zmq_getsockopt ZMQ_DELAY_ATTACH_ON_CONNECT

events :: Socket a -> IO [Event]

Cf. zmq_getsockopt ZMQ_EVENTS

fileDescriptor :: Socket a -> IO Fd

Cf. zmq_getsockopt ZMQ_FD

identity :: Socket a -> IO ByteString

Cf. zmq_getsockopt ZMQ_IDENTITY

ipv4Only :: Socket a -> IO Bool

Cf. zmq_getsockopt ZMQ_IPV4ONLY

lastEndpoint :: Socket a -> IO String

Cf. zmq_getsockopt ZMQ_LAST_ENDPOINT

linger :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_LINGER

maxMessageSize :: Socket a -> IO Int64

Cf. zmq_getsockopt ZMQ_MAXMSGSIZE

mcastHops :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_MULTICAST_HOPS

moreToReceive :: Socket a -> IO Bool

Cf. zmq_getsockopt ZMQ_RCVMORE

rate :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RATE

receiveBuffer :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RCVBUF

receiveHighWM :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RCVHWM

receiveTimeout :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RCVTIMEO

reconnectInterval :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RECONNECT_IVL

reconnectIntervalMax :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RECONNECT_IVL_MAX

recoveryInterval :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_RECOVERY_IVL

sendBuffer :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_SNDBUF

sendHighWM :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_SNDHWM

sendTimeout :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_SNDTIMEO

tcpKeepAlive :: Socket a -> IO Switch

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE

tcpKeepAliveCount :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_CNT

tcpKeepAliveIdle :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_IDLE

tcpKeepAliveInterval :: Socket a -> IO Int

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_INTVL

Socket Options (Write)

setAffinity :: Word64 -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_AFFINITY

setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_BACKLOG

setDelayAttachOnConnect :: Bool -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_DELAY_ATTACH_ON_CONNECT

setIdentity :: Restricted N1 N254 ByteString -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_IDENTITY

setIpv4Only :: Bool -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_IPV4ONLY

setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_LINGER

setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_MAXMSGSIZE

setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_MULTICAST_HOPS

setRate :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RATE

setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RCVBUF

setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RCVHWM

setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RCVTIMEO

setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RECONNECT_IVL

setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RECONNECT_IVL_MAX

setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_RECOVERY_IVL

setRouterMandatory :: Bool -> Socket Router -> IO ()

Cf. zmq_setsockopt ZMQ_ROUTER_MANDATORY

setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_SNDBUF

setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_SNDHWM

setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_SNDTIMEO

setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_TCP_ACCEPT_FILTER

setTcpKeepAlive :: Switch -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE

setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_CNT

setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_IDLE

setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_INTVL

setXPubVerbose :: Bool -> Socket XPub -> IO ()

Cf. zmq_setsockopt ZMQ_XPUB_VERBOSE

Restrictions

restrict :: Restriction l u v => v -> Restricted l u v

Create a restricted value. If the given value does not satisfy the restrictions, a modified variant is used instead, e.g. if an integer is larger than the upper bound, the upper bound value is used.

toRestricted :: Restriction l u v => v -> Maybe (Restricted l u v)

Create a restricted value. Returns Nothing if the given value does not satisfy all restrictions.

Error Handling

data ZMQError

ZMQError encapsulates information about errors, which occur when using the native 0MQ API, such as error number and message.

errno :: ZMQError -> Int

Error number value.

source :: ZMQError -> String

Source where this error originates from.

message :: ZMQError -> String

Actual error message.

Low-level Functions

init :: Size -> IO Context

Deprecated: Use context

term :: Context -> IO ()

Deprecated: Use destroy

context :: IO Context

Initialize a 0MQ context (cf. zmq_ctx_new for details). You should normally prefer to use withContext instead.

destroy :: Context -> IO ()

Terminate a 0MQ context (cf. zmq_ctx_destroy). You should normally prefer to use withContext instead.

socket :: SocketType a => Context -> a -> IO (Socket a)

Create a new 0MQ socket within the given context. withSocket provides automatic socket closing and may be safer to use.

close :: Socket a -> IO ()

Close a 0MQ socket. withSocket provides automatic socket closing and may be safer to use.

waitRead :: Socket a -> IO ()

Wait until data is available for reading from the given Socket. After this function returns, a call to receive will essentially be non-blocking.

waitWrite :: Socket a -> IO ()

Wait until data can be written to the given Socket. After this function returns, a call to send will essentially be non-blocking.

Utils

proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()

Starts built-in 0MQ proxy.

Proxy connects front to back socket

Before calling proxy all sockets should be binded

If the capture socket is not Nothing, the proxy shall send all messages, received on both frontend and backend, to the capture socket.