Portability | non-portable |
---|---|
Stability | experimental |
Maintainer | Toralf Wittner <tw@dtex.org> |
Safe Haskell | None |
0MQ haskell binding. The API closely follows the C-API of 0MQ with the main difference that sockets are typed. The documentation of the individual socket types is copied from 0MQ's man pages authored by Martin Sustrik. For details please refer to http://api.zeromq.org
Differences to zeromq-haskell 2.x
Socket Types
-
Up
andDown
no longer exist. -
XReq
is renamed toDealer
andXRep
is renamed toRouter
(in accordance with libzmq).XReq
andXRep
are available as deprecated aliases. - Renamed type-classes:
,SType
->SocketType
.SubsType
->Subscriber
- New type-classes:
Sender
,Receiver
Socket Options
Instead of a single SocketOption
data-type, getter and setter
functions are provided, e.g. one would write:
instead of
affinity
sockgetOption sock (Affinity 0)
Restrictions
Many option setters use a Restriction
to further constrain the
range of possible values of their integral types. For example
the maximum message size can be given as -1, which means no limit
or by greater values, which denote the message size in bytes. The
type of setMaxMessageSize
is therefore:
setMaxMessageSize ::Integral
i =>Restricted
Nneg1
Int64
i ->Socket
a ->IO
()
which means any integral value in the range of -1
to
(
) can be given. To create a restricted
value from plain value, use maxBound
:: Int64
toRestricted
or restrict
.
Devices
Devices are no longer present in 0MQ 3.x and consequently have been removed form this binding as well.
Error Handling
The type ZMQError
is introduced, together with inspection functions errno
,
source
and message
. zmq_strerror
is used underneath to retrieve the
correct error message. ZMQError will be thrown when native 0MQ procedures return
an error status and it can be catch
ed as an Exception
.
- type Size = Word
- data Context
- data Socket a
- data Flag = SendMore
- data Switch
- type Timeout = Int64
- data Event
- data EventType
- data EventMsg
- = Connected !ByteString !Fd
- | ConnectDelayed !ByteString !Fd
- | ConnectRetried !ByteString !Int
- | Listening !ByteString !Fd
- | BindFailed !ByteString !Fd
- | Accepted !ByteString !Fd
- | AcceptFailed !ByteString !Int
- | Closed !ByteString !Fd
- | CloseFailed !ByteString !Int
- | Disconnected !ByteString !Int
- data Poll m where
- class SocketType a
- class Sender a
- class Receiver a
- class Subscriber a
- data Pair = Pair
- data Pub = Pub
- data Sub = Sub
- data XPub = XPub
- data XSub = XSub
- data Req = Req
- data Rep = Rep
- data Dealer = Dealer
- data Router = Router
- type XReq = Dealer
- type XRep = Router
- data Pull = Pull
- data Push = Push
- withContext :: (Context -> IO a) -> IO a
- withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
- bind :: Socket a -> String -> IO ()
- unbind :: Socket a -> String -> IO ()
- connect :: Socket a -> String -> IO ()
- send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- sendMulti :: Sender a => Socket a -> [ByteString] -> IO ()
- receive :: Receiver a => Socket a -> IO ByteString
- receiveMulti :: Receiver a => Socket a -> IO [ByteString]
- version :: IO (Int, Int, Int)
- monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
- poll :: MonadIO m => Timeout -> [Poll m] -> m [[Event]]
- subscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- ioThreads :: Context -> IO Word
- maxSockets :: Context -> IO Word
- setIoThreads :: Word -> Context -> IO ()
- setMaxSockets :: Word -> Context -> IO ()
- affinity :: Socket a -> IO Word64
- backlog :: Socket a -> IO Int
- delayAttachOnConnect :: Socket a -> IO Bool
- events :: Socket a -> IO [Event]
- fileDescriptor :: Socket a -> IO Fd
- identity :: Socket a -> IO ByteString
- ipv4Only :: Socket a -> IO Bool
- lastEndpoint :: Socket a -> IO String
- linger :: Socket a -> IO Int
- maxMessageSize :: Socket a -> IO Int64
- mcastHops :: Socket a -> IO Int
- moreToReceive :: Socket a -> IO Bool
- rate :: Socket a -> IO Int
- receiveBuffer :: Socket a -> IO Int
- receiveHighWM :: Socket a -> IO Int
- receiveTimeout :: Socket a -> IO Int
- reconnectInterval :: Socket a -> IO Int
- reconnectIntervalMax :: Socket a -> IO Int
- recoveryInterval :: Socket a -> IO Int
- sendBuffer :: Socket a -> IO Int
- sendHighWM :: Socket a -> IO Int
- sendTimeout :: Socket a -> IO Int
- tcpKeepAlive :: Socket a -> IO Switch
- tcpKeepAliveCount :: Socket a -> IO Int
- tcpKeepAliveIdle :: Socket a -> IO Int
- tcpKeepAliveInterval :: Socket a -> IO Int
- setAffinity :: Word64 -> Socket a -> IO ()
- setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
- setIdentity :: Restricted N1 N254 ByteString -> Socket a -> IO ()
- setIpv4Only :: Bool -> Socket a -> IO ()
- setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()
- setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()
- setRate :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()
- setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setRouterMandatory :: Bool -> Socket Router -> IO ()
- setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
- setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()
- setTcpKeepAlive :: Switch -> Socket a -> IO ()
- setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
- setXPubVerbose :: Bool -> Socket XPub -> IO ()
- restrict :: Restriction l u v => v -> Restricted l u v
- toRestricted :: Restriction l u v => v -> Maybe (Restricted l u v)
- data ZMQError
- errno :: ZMQError -> Int
- source :: ZMQError -> String
- message :: ZMQError -> String
- init :: Size -> IO Context
- term :: Context -> IO ()
- context :: IO Context
- destroy :: Context -> IO ()
- socket :: SocketType a => Context -> a -> IO (Socket a)
- close :: Socket a -> IO ()
- waitRead :: Socket a -> IO ()
- waitWrite :: Socket a -> IO ()
- proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
Type Definitions
data Context
A 0MQ context representation.
data Socket a
A 0MQ Socket.
data Switch
Configuration switch
data Event
Socket events.
data EventType
Event types to monitor.
data EventMsg
Event Message to receive when monitoring socket events.
data Poll m where
Type representing a descriptor, poll is waiting for (either a 0MQ socket or a file descriptor) plus the type of event to wait for.
Type Classes
class SocketType a
Socket types.
Socket Types
data Pair
Socket to communicate with a single peer. Allows for only a
single connect or a single bind. There's no message routing
or message filtering involved. Compatible peer sockets: Pair
.
data Pub
data Sub
data XPub
data XSub
data Req
data Rep
data Dealer
data Router
When receiving messages a Router socket shall prepend a message
part containing the identity of the originating peer to
the message before passing it to the application. Messages
received are fair-queued from among all connected peers. When
sending messages a Router socket shall remove the first part of
the message and use it to determine the identity of the peer the
message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded.
Compatible peer sockets: Dealer
, Req
, Rep
.
data Pull
A socket of type Pull is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. The zmq_send() function is not implemented for this socket type.
data Push
A socket of type Push is used by a pipeline node to send messages to downstream pipeline nodes. Messages are load-balanced to all connected downstream nodes. The zmq_recv() function is not implemented for this socket type.
When a Push socket enters an exceptional state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any zmq_send(3) operations on the socket shall block until the exceptional state ends or at least one downstream node becomes available for sending; messages are not discarded.
General Operations
withContext :: (Context -> IO a) -> IO a
Run an action with a 0MQ context. The Context
supplied to your
action will not be valid after the action either returns or
throws an exception.
withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
Run an action with a 0MQ socket. The socket will be closed after running the supplied action even if an error occurs. The socket supplied to your action will not be valid after the action terminates.
send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
Send the given ByteString
over the socket (cf. zmq_sendmsg).
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
Send the given ByteString
over the socket (cf. zmq_sendmsg).
This is operationally identical to send socket (Strict.concat
(Lazy.toChunks lbs)) flags
but may be more efficient.
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send'
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
sendMulti :: Sender a => Socket a -> [ByteString] -> IO ()
receive :: Receiver a => Socket a -> IO ByteString
Receive a ByteString
from socket (cf. zmq_recvmsg).
Note: This function always calls zmq_recvmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still receive
is blocking the thread as long as no data
is available using GHC's threadWaitRead
.
receiveMulti :: Receiver a => Socket a -> IO [ByteString]
Receive a multi-part message.
This function collects all message parts send via sendMulti
.
Return the runtime version of the underlying 0MQ library as a (major, minor, patch) triple.
monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
Monitor socket events.
This function returns a function which can be invoked to retrieve
the next socket event, potentially blocking until the next one becomes
available. When applied to False
, monitoring will terminate, i.e.
internal monitoring resources will be disposed. Consequently after
monitor
has been invoked, the returned function must be applied
once to False
.
subscribe :: Subscriber a => Socket a -> ByteString -> IO ()
Subscribe Socket to given subscription.
unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()
Unsubscribe Socket from given subscription.
Context Options (Read)
maxSockets :: Context -> IO Word
Cf. zmq_ctx_get ZMQ_MAX_SOCKETS
Context Options (Write)
setIoThreads :: Word -> Context -> IO ()
Cf. zmq_ctx_set ZMQ_IO_THREADS
setMaxSockets :: Word -> Context -> IO ()
Cf. zmq_ctx_set ZMQ_MAX_SOCKETS
Socket Options (Read)
delayAttachOnConnect :: Socket a -> IO Bool
Cf. zmq_getsockopt ZMQ_DELAY_ATTACH_ON_CONNECT
fileDescriptor :: Socket a -> IO Fd
Cf. zmq_getsockopt ZMQ_FD
identity :: Socket a -> IO ByteString
Cf. zmq_getsockopt ZMQ_IDENTITY
lastEndpoint :: Socket a -> IO String
Cf. zmq_getsockopt ZMQ_LAST_ENDPOINT
maxMessageSize :: Socket a -> IO Int64
Cf. zmq_getsockopt ZMQ_MAXMSGSIZE
moreToReceive :: Socket a -> IO Bool
Cf. zmq_getsockopt ZMQ_RCVMORE
receiveBuffer :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RCVBUF
receiveHighWM :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RCVHWM
receiveTimeout :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RCVTIMEO
reconnectInterval :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RECONNECT_IVL
reconnectIntervalMax :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RECONNECT_IVL_MAX
recoveryInterval :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_RECOVERY_IVL
sendBuffer :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_SNDBUF
sendHighWM :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_SNDHWM
sendTimeout :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_SNDTIMEO
tcpKeepAlive :: Socket a -> IO Switch
Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE
tcpKeepAliveCount :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_CNT
tcpKeepAliveIdle :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_IDLE
tcpKeepAliveInterval :: Socket a -> IO Int
Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_INTVL
Socket Options (Write)
setAffinity :: Word64 -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_AFFINITY
setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_BACKLOG
setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_DELAY_ATTACH_ON_CONNECT
setIdentity :: Restricted N1 N254 ByteString -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_IDENTITY
setIpv4Only :: Bool -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_IPV4ONLY
setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_LINGER
setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_MAXMSGSIZE
setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_MULTICAST_HOPS
setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RCVBUF
setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RCVHWM
setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RCVTIMEO
setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RECONNECT_IVL
setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RECONNECT_IVL_MAX
setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_RECOVERY_IVL
setRouterMandatory :: Bool -> Socket Router -> IO ()
Cf. zmq_setsockopt ZMQ_ROUTER_MANDATORY
setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_SNDBUF
setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_SNDHWM
setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_SNDTIMEO
setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_TCP_ACCEPT_FILTER
setTcpKeepAlive :: Switch -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE
setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_CNT
setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_IDLE
setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_INTVL
setXPubVerbose :: Bool -> Socket XPub -> IO ()
Cf. zmq_setsockopt ZMQ_XPUB_VERBOSE
Restrictions
restrict :: Restriction l u v => v -> Restricted l u v
Create a restricted value. If the given value does not satisfy the restrictions, a modified variant is used instead, e.g. if an integer is larger than the upper bound, the upper bound value is used.
toRestricted :: Restriction l u v => v -> Maybe (Restricted l u v)
Create a restricted value. Returns Nothing
if
the given value does not satisfy all restrictions.
Error Handling
data ZMQError
ZMQError encapsulates information about errors, which occur when using the native 0MQ API, such as error number and message.
Low-level Functions
Initialize a 0MQ context (cf. zmq_ctx_new for details). You should
normally prefer to use withContext
instead.
Terminate a 0MQ context (cf. zmq_ctx_destroy). You should normally
prefer to use withContext
instead.
socket :: SocketType a => Context -> a -> IO (Socket a)
Create a new 0MQ socket within the given context. withSocket
provides
automatic socket closing and may be safer to use.
Close a 0MQ socket. withSocket
provides automatic socket closing and may
be safer to use.
Wait until data is available for reading from the given Socket.
After this function returns, a call to receive
will essentially be
non-blocking.
waitWrite :: Socket a -> IO ()
Wait until data can be written to the given Socket.
After this function returns, a call to send
will essentially be
non-blocking.
Utils
proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
Starts built-in 0MQ proxy.
Proxy connects front to back socket
Before calling proxy all sockets should be binded
If the capture socket is not Nothing, the proxy shall send all messages, received on both frontend and backend, to the capture socket.