Thomas Spura 4c7a024
/*
Thomas Spura 4c7a024
    Copyright (c) 2009-2011 250bpm s.r.o.
Thomas Spura 4c7a024
    Copyright (c) 2011 Botond Ballo
Thomas Spura 4c7a024
    Copyright (c) 2007-2009 iMatix Corporation
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    Permission is hereby granted, free of charge, to any person obtaining a copy
Thomas Spura 4c7a024
    of this software and associated documentation files (the "Software"), to
Thomas Spura 4c7a024
    deal in the Software without restriction, including without limitation the
Thomas Spura 4c7a024
    rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
Thomas Spura 4c7a024
    sell copies of the Software, and to permit persons to whom the Software is
Thomas Spura 4c7a024
    furnished to do so, subject to the following conditions:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    The above copyright notice and this permission notice shall be included in
Thomas Spura 4c7a024
    all copies or substantial portions of the Software.
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
Thomas Spura 4c7a024
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
Thomas Spura 4c7a024
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Thomas Spura 4c7a024
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
Thomas Spura 4c7a024
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
Thomas Spura 4c7a024
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
Thomas Spura 4c7a024
    IN THE SOFTWARE.
Thomas Spura 4c7a024
*/
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifndef __ZMQ_HPP_INCLUDED__
Thomas Spura 4c7a024
#define __ZMQ_HPP_INCLUDED__
Thomas Spura 4c7a024
Thomas Spura 729573e
#if __cplusplus >= 201103L
Thomas Spura 729573e
#define ZMQ_CPP11
Thomas Spura 729573e
#define ZMQ_NOTHROW noexcept
Thomas Spura 729573e
#define ZMQ_EXPLICIT explicit
Thomas Spura 729573e
#else
Thomas Spura 729573e
    #define ZMQ_CPP03
Thomas Spura 729573e
    #define ZMQ_NOTHROW
Thomas Spura 729573e
    #define ZMQ_EXPLICIT
Thomas Spura 729573e
#endif
Thomas Spura 729573e
Thomas Spura 4c7a024
#include <zmq.h>
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#include <algorithm>
Thomas Spura 4c7a024
#include <cassert>
Thomas Spura 4c7a024
#include <cstring>
Thomas Spura 4c7a024
#include <string>
Thomas Spura 4c7a024
#include <exception>
Thomas Spura 729573e
#include <vector>
Thomas Spura 729573e
#include <iterator>
Thomas Spura 729573e
Thomas Spura 729573e
#ifdef ZMQ_CPP11
Thomas Spura 729573e
#include <chrono>
Thomas Spura 729573e
#include <tuple>
Thomas Spura 729573e
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
//  Detect whether the compiler supports C++11 rvalue references.
Thomas Spura 4c7a024
#if (defined(__GNUC__) && (__GNUC__ > 4 || \
Thomas Spura 4c7a024
      (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \
Thomas Spura 4c7a024
      defined(__GXX_EXPERIMENTAL_CXX0X__))
Thomas Spura 4c7a024
    #define ZMQ_HAS_RVALUE_REFS
Thomas Spura 4c7a024
    #define ZMQ_DELETED_FUNCTION = delete
Thomas Spura 4c7a024
#elif defined(__clang__)
Thomas Spura 4c7a024
    #if __has_feature(cxx_rvalue_references)
Thomas Spura 4c7a024
        #define ZMQ_HAS_RVALUE_REFS
Thomas Spura 4c7a024
    #endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    #if __has_feature(cxx_deleted_functions)
Thomas Spura 4c7a024
        #define ZMQ_DELETED_FUNCTION = delete
Thomas Spura 4c7a024
    #else
Thomas Spura 4c7a024
        #define ZMQ_DELETED_FUNCTION
Thomas Spura 4c7a024
    #endif
Thomas Spura 4c7a024
#elif defined(_MSC_VER) && (_MSC_VER >= 1600)
Thomas Spura 4c7a024
    #define ZMQ_HAS_RVALUE_REFS
Thomas Spura 4c7a024
    #define ZMQ_DELETED_FUNCTION
Thomas Spura 4c7a024
#else
Thomas Spura 4c7a024
    #define ZMQ_DELETED_FUNCTION
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
Thomas Spura 4c7a024
#define ZMQ_NEW_MONITOR_EVENT_LAYOUT
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
Thomas Spura 4c7a024
#define ZMQ_HAS_PROXY_STEERABLE
Thomas Spura 4c7a024
/*  Socket event data  */
Thomas Spura 4c7a024
typedef struct {
Thomas Spura 4c7a024
    uint16_t event;  // id of the event as bitfield
Thomas Spura 4c7a024
    int32_t  value ; // value is either error code, fd or reconnect interval
Thomas Spura 4c7a024
} zmq_event_t;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura eb3bd66
// Avoid using deprecated message receive function when possible
Thomas Spura eb3bd66
#if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
Thomas Spura eb3bd66
#  define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags)
Thomas Spura eb3bd66
#endif
Thomas Spura eb3bd66
Thomas Spura eb3bd66
Thomas Spura 4c7a024
// In order to prevent unused variable warnings when building in non-debug
Thomas Spura 4c7a024
// mode use this macro to make assertions.
Thomas Spura 4c7a024
#ifndef NDEBUG
Thomas Spura 4c7a024
#   define ZMQ_ASSERT(expression) assert(expression)
Thomas Spura 4c7a024
#else
Thomas Spura 4c7a024
#   define ZMQ_ASSERT(expression) (void)(expression)
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
namespace zmq
Thomas Spura 4c7a024
{
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    typedef zmq_free_fn free_fn;
Thomas Spura 4c7a024
    typedef zmq_pollitem_t pollitem_t;
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    class error_t : public std::exception
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
    public:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        error_t () : errnum (zmq_errno ()) {}
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        virtual const char *what () const throw ()
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return zmq_strerror (errnum);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        int num () const
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return errnum;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    private:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        int errnum;
Thomas Spura 4c7a024
    };
Thomas Spura 4c7a024
Thomas Spura eb3bd66
    inline int poll (zmq_pollitem_t const* items_, size_t nitems_, long timeout_ = -1)
Thomas Spura 4c7a024
    {
Thomas Spura eb3bd66
        int rc = zmq_poll (const_cast<zmq_pollitem_t*>(items_), static_cast<int>(nitems_), timeout_);
Thomas Spura 4c7a024
        if (rc < 0)
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
        return rc;
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
Thomas Spura 729573e
    inline int poll(zmq_pollitem_t const* items, size_t nitems)
Thomas Spura 729573e
    {
Thomas Spura eb3bd66
        return poll(items, nitems, -1);
Thomas Spura 729573e
    }
Thomas Spura 729573e
Thomas Spura 729573e
    #ifdef ZMQ_CPP11
Thomas Spura 729573e
    inline int poll(zmq_pollitem_t const* items, size_t nitems, std::chrono::milliseconds timeout)
Thomas Spura 729573e
    {
Thomas Spura 729573e
        return poll(items, nitems, timeout.count() );
Thomas Spura 729573e
    }
Thomas Spura 729573e
Thomas Spura 729573e
    inline int poll(std::vector<zmq_pollitem_t> const& items, std::chrono::milliseconds timeout)
Thomas Spura 729573e
    {
Thomas Spura 729573e
        return poll(items.data(), items.size(), timeout.count() );
Thomas Spura 729573e
    }
Thomas Spura 729573e
Thomas Spura 729573e
    inline int poll(std::vector<zmq_pollitem_t> const& items, long timeout_ = -1)
Thomas Spura 729573e
    {
Thomas Spura 729573e
        return poll(items.data(), items.size(), timeout_);
Thomas Spura 729573e
    }
Thomas Spura eb3bd66
    #endif
Thomas Spura 729573e
Thomas Spura 729573e
Thomas Spura 729573e
Thomas Spura 4c7a024
    inline void proxy (void *frontend, void *backend, void *capture)
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        int rc = zmq_proxy (frontend, backend, capture);
Thomas Spura 4c7a024
        if (rc != 0)
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
    
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_PROXY_STEERABLE
Thomas Spura 4c7a024
    inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control)
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        int rc = zmq_proxy_steerable (frontend, backend, capture, control);
Thomas Spura 4c7a024
        if (rc != 0)
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
    
Thomas Spura 4c7a024
    inline void version (int *major_, int *minor_, int *patch_)
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        zmq_version (major_, minor_, patch_);
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
Thomas Spura 729573e
    #ifdef ZMQ_CPP11
Thomas Spura 729573e
    inline std::tuple<int, int, int> version()
Thomas Spura 729573e
    {
Thomas Spura 729573e
        std::tuple<int, int, int> v;
Thomas Spura 729573e
        zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v) );
Thomas Spura 729573e
        return v;
Thomas Spura 729573e
    }
Thomas Spura 729573e
    #endif
Thomas Spura 729573e
Thomas Spura 4c7a024
    class message_t
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        friend class socket_t;
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    public:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline message_t ()
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_init (&msg;;
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline explicit message_t (size_t size_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_init_size (&msg, size_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        template<typename I> message_t(I first, I last):
Thomas Spura 729573e
            msg()
Thomas Spura 729573e
        {
Thomas Spura 729573e
            typedef typename std::iterator_traits::difference_type size_type;
Thomas Spura eb3bd66
            typedef typename std::iterator_traits::value_type value_t;
Thomas Spura 729573e
Thomas Spura eb3bd66
            size_type const size_ = std::distance(first, last)*sizeof(value_t);
Thomas Spura 729573e
            int const rc = zmq_msg_init_size (&msg, size_);
Thomas Spura 729573e
            if (rc != 0)
Thomas Spura 729573e
                throw error_t ();
Thomas Spura eb3bd66
            value_t* dest = data<value_t>();
Thomas Spura eb3bd66
            while (first != last)
Thomas Spura eb3bd66
            {
Thomas Spura eb3bd66
                *dest = *first;
Thomas Spura eb3bd66
                ++dest; ++first;
Thomas Spura eb3bd66
            }
Thomas Spura eb3bd66
        }
Thomas Spura eb3bd66
Thomas Spura eb3bd66
        inline message_t (const void *data_, size_t size_)
Thomas Spura eb3bd66
        {
Thomas Spura eb3bd66
            int rc = zmq_msg_init_size (&msg, size_);
Thomas Spura eb3bd66
            if (rc != 0)
Thomas Spura eb3bd66
                throw error_t ();
Thomas Spura eb3bd66
            memcpy(data(), data_, size_);
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline message_t (void *data_, size_t size_, free_fn *ffn_,
Thomas Spura 4c7a024
            void *hint_ = NULL)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
Thomas Spura 729573e
        inline message_t (message_t &&rhs): msg (rhs.msg)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_init (&rhs.msg);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline message_t &operator = (message_t &&rhs) ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            std::swap (msg, rhs.msg);
Thomas Spura 4c7a024
            return *this;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline ~message_t () ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_close (&msg;;
Thomas Spura 4c7a024
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline void rebuild ()
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_close (&msg;;
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
            rc = zmq_msg_init (&msg;;
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline void rebuild (size_t size_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_close (&msg;;
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
            rc = zmq_msg_init_size (&msg, size_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura eb3bd66
        inline void rebuild (const void *data_, size_t size_)
Thomas Spura eb3bd66
        {
Thomas Spura eb3bd66
            int rc = zmq_msg_close (&msg;;
Thomas Spura eb3bd66
            if (rc != 0)
Thomas Spura eb3bd66
                throw error_t ();
Thomas Spura eb3bd66
            rc = zmq_msg_init_size (&msg, size_);
Thomas Spura eb3bd66
            if (rc != 0)
Thomas Spura eb3bd66
                throw error_t ();
Thomas Spura eb3bd66
            memcpy(data(), data_, size_);
Thomas Spura eb3bd66
        }
Thomas Spura eb3bd66
Thomas Spura 4c7a024
        inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
Thomas Spura 4c7a024
            void *hint_ = NULL)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_msg_close (&msg;;
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
            rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void move (message_t const *msg_)
Thomas Spura 4c7a024
        {
Thomas Spura 729573e
            int rc = zmq_msg_move (&msg, const_cast<zmq_msg_t*>(&(msg_->msg)));
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void copy (message_t const *msg_)
Thomas Spura 4c7a024
        {
Thomas Spura 729573e
            int rc = zmq_msg_copy (&msg, const_cast<zmq_msg_t*>(&(msg_->msg)));
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline bool more () const ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 729573e
            int rc = zmq_msg_more (const_cast<zmq_msg_t*>(&msg) );
Thomas Spura 4c7a024
            return rc != 0;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void *data () ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return zmq_msg_data (&msg;;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline const void* data () const ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return zmq_msg_data (const_cast<zmq_msg_t*>(&msg));
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline size_t size () const ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return zmq_msg_size (const_cast<zmq_msg_t*>(&msg));
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        template<typename T> T* data() ZMQ_NOTHROW
Thomas Spura 729573e
        {
Thomas Spura 729573e
            return static_cast<T*>( data() );
Thomas Spura 729573e
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        template<typename T> T const* data() const ZMQ_NOTHROW
Thomas Spura 729573e
        {
Thomas Spura 729573e
            return static_cast<T const*>( data() );
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 729573e
Thomas Spura 729573e
    private:
Thomas Spura 4c7a024
        //  The underlying message
Thomas Spura 4c7a024
        zmq_msg_t msg;
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        //  Disable implicit message copying, so that users won't use shared
Thomas Spura 4c7a024
        //  messages (less efficient) without being aware of the fact.
Thomas Spura 729573e
        message_t (const message_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 729573e
        void operator = (const message_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 4c7a024
    };
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    class context_t
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        friend class socket_t;
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    public:
Thomas Spura 4c7a024
        inline context_t ()
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            ptr = zmq_ctx_new ();
Thomas Spura 4c7a024
            if (ptr == NULL)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline explicit context_t (int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            ptr = zmq_ctx_new ();
Thomas Spura 4c7a024
            if (ptr == NULL)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
Thomas Spura 4c7a024
            int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_);
Thomas Spura 4c7a024
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
Thomas Spura 4c7a024
            rc = zmq_ctx_set (ptr, ZMQ_MAX_SOCKETS, max_sockets_);
Thomas Spura 4c7a024
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
Thomas Spura 729573e
        inline context_t (context_t &&rhs) ZMQ_NOTHROW : ptr (rhs.ptr)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            rhs.ptr = NULL;
Thomas Spura 4c7a024
        }
Thomas Spura 729573e
        inline context_t &operator = (context_t &&rhs) ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            std::swap (ptr, rhs.ptr);
Thomas Spura 4c7a024
            return *this;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline ~context_t () ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura eb3bd66
            int rc = zmq_ctx_destroy (ptr);
Thomas Spura eb3bd66
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void close() ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura eb3bd66
            int rc = zmq_ctx_shutdown (ptr);
Thomas Spura 4c7a024
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        //  Be careful with this, it's probably only useful for
Thomas Spura 4c7a024
        //  using the C api together with an existing C++ api.
Thomas Spura 4c7a024
        //  Normally you should never need to use this.
Thomas Spura 729573e
        inline ZMQ_EXPLICIT operator void* () ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return ptr;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline ZMQ_EXPLICIT operator void const* () const ZMQ_NOTHROW
Thomas Spura 729573e
        {
Thomas Spura 729573e
            return ptr;
Thomas Spura 729573e
        }
Thomas Spura 4c7a024
    private:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        void *ptr;
Thomas Spura 4c7a024
Thomas Spura 729573e
        context_t (const context_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 729573e
        void operator = (const context_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 729573e
    };
Thomas Spura 729573e
Thomas Spura 729573e
    #ifdef ZMQ_CPP11
Thomas Spura 729573e
    enum class socket_type: int
Thomas Spura 729573e
    {
Thomas Spura 729573e
        req = ZMQ_REQ,
Thomas Spura 729573e
        rep = ZMQ_REP,
Thomas Spura 729573e
        dealer = ZMQ_DEALER,
Thomas Spura 729573e
        router = ZMQ_ROUTER,
Thomas Spura 729573e
        pub = ZMQ_PUB,
Thomas Spura 729573e
        sub = ZMQ_SUB,
Thomas Spura 729573e
        xpub = ZMQ_XPUB,
Thomas Spura 729573e
        xsub = ZMQ_XSUB,
Thomas Spura 729573e
        push = ZMQ_PUSH,
Thomas Spura 729573e
        pull = ZMQ_PULL,
Thomas Spura eb3bd66
#if ZMQ_VERSION_MAJOR < 4
Thomas Spura eb3bd66
        pair = ZMQ_PAIR
Thomas Spura eb3bd66
#else
Thomas Spura 729573e
        pair = ZMQ_PAIR,
Thomas Spura 729573e
        stream = ZMQ_STREAM
Thomas Spura eb3bd66
#endif
Thomas Spura 4c7a024
    };
Thomas Spura 729573e
    #endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    class socket_t
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
        friend class monitor_t;
Thomas Spura 4c7a024
    public:
Thomas Spura 729573e
        inline socket_t(context_t& context_, int type_)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            init(context_, type_);
Thomas Spura 729573e
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        #ifdef ZMQ_CPP11
Thomas Spura 729573e
        inline socket_t(context_t& context_, socket_type type_)
Thomas Spura 4c7a024
        {
Thomas Spura 729573e
            init(context_, static_cast<int>(type_));
Thomas Spura 4c7a024
        }
Thomas Spura 729573e
        #endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
Thomas Spura 729573e
        inline socket_t(socket_t&& rhs) ZMQ_NOTHROW : ptr(rhs.ptr)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            rhs.ptr = NULL;
Thomas Spura 4c7a024
        }
Thomas Spura 729573e
        inline socket_t& operator=(socket_t&& rhs) ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            std::swap(ptr, rhs.ptr);
Thomas Spura 4c7a024
            return *this;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline ~socket_t () ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            close();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline ZMQ_EXPLICIT operator void* () ZMQ_NOTHROW
Thomas Spura 729573e
        {
Thomas Spura 729573e
            return ptr;
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 729573e
        inline ZMQ_EXPLICIT operator void const* () const ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return ptr;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void close() ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            if(ptr == NULL)
Thomas Spura 4c7a024
                // already closed
Thomas Spura 4c7a024
                return ;
Thomas Spura 4c7a024
            int rc = zmq_close (ptr);
Thomas Spura 4c7a024
            ZMQ_ASSERT (rc == 0);
Thomas Spura 4c7a024
            ptr = 0 ;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        template<typename T> void setsockopt(int option_, T const& optval)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            setsockopt(option_, &optval, sizeof(T) );
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline void setsockopt (int option_, const void *optval_,
Thomas Spura 4c7a024
            size_t optvallen_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline void getsockopt (int option_, void *optval_,
Thomas Spura eb3bd66
            size_t *optvallen_) const
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 729573e
Thomas Spura eb3bd66
        template<typename T> T getsockopt(int option_) const
Thomas Spura 729573e
        {
Thomas Spura 729573e
            T optval;
Thomas Spura 729573e
            size_t optlen = sizeof(T);
Thomas Spura 729573e
            getsockopt(option_, &optval, &optlen );
Thomas Spura 729573e
            return optval;
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 729573e
        inline void bind(std::string const& addr)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            bind(addr.c_str());
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline void bind (const char *addr_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_bind (ptr, addr_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void unbind(std::string const& addr)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            unbind(addr.c_str());
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline void unbind (const char *addr_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_unbind (ptr, addr_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void connect(std::string const& addr)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            connect(addr.c_str());
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline void connect (const char *addr_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_connect (ptr, addr_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline void disconnect(std::string const& addr)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            disconnect(addr.c_str());
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        inline void disconnect (const char *addr_)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_disconnect (ptr, addr_);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        inline bool connected() const ZMQ_NOTHROW
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return(ptr != NULL);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
        
Thomas Spura 4c7a024
        inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int nbytes = zmq_send (ptr, buf_, len_, flags_);
Thomas Spura 4c7a024
            if (nbytes >= 0)
Thomas Spura 4c7a024
                return (size_t) nbytes;
Thomas Spura 4c7a024
            if (zmq_errno () == EAGAIN)
Thomas Spura 4c7a024
                return 0;
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline bool send (message_t &msg_, int flags_ = 0)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_);
Thomas Spura 4c7a024
            if (nbytes >= 0)
Thomas Spura 4c7a024
                return true;
Thomas Spura 4c7a024
            if (zmq_errno () == EAGAIN)
Thomas Spura 4c7a024
                return false;
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 729573e
        template<typename I> bool send(I first, I last, int flags_=0)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            zmq::message_t msg(first, last);
Thomas Spura 729573e
            return send(msg, flags_);
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
Thomas Spura 4c7a024
        inline bool send (message_t &&msg_, int flags_ = 0)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            return send(msg_, flags_);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline size_t recv (void *buf_, size_t len_, int flags_ = 0)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int nbytes = zmq_recv (ptr, buf_, len_, flags_);
Thomas Spura 4c7a024
            if (nbytes >= 0)
Thomas Spura 4c7a024
                return (size_t) nbytes;
Thomas Spura 4c7a024
            if (zmq_errno () == EAGAIN)
Thomas Spura 4c7a024
                return 0;
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        inline bool recv (message_t *msg_, int flags_ = 0)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
Thomas Spura 4c7a024
            if (nbytes >= 0)
Thomas Spura 4c7a024
                return true;
Thomas Spura 4c7a024
            if (zmq_errno () == EAGAIN)
Thomas Spura 4c7a024
                return false;
Thomas Spura 4c7a024
            throw error_t ();
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
        
Thomas Spura 4c7a024
    private:
Thomas Spura 729573e
        inline void init(context_t& context_, int type_)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            ctxptr = context_.ptr;
Thomas Spura 729573e
            ptr = zmq_socket (context_.ptr, type_ );
Thomas Spura 729573e
            if (ptr == NULL)
Thomas Spura 729573e
                throw error_t ();
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        void *ptr;
Thomas Spura 4c7a024
        void *ctxptr;
Thomas Spura 4c7a024
Thomas Spura 4c7a024
        socket_t (const socket_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 4c7a024
        void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
Thomas Spura 4c7a024
    };
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    class monitor_t
Thomas Spura 4c7a024
    {
Thomas Spura 4c7a024
    public:
Thomas Spura 4c7a024
        monitor_t() : socketPtr(NULL) {}
Thomas Spura 4c7a024
        virtual ~monitor_t() {}
Thomas Spura 4c7a024
Thomas Spura 729573e
        void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL)
Thomas Spura 729573e
        {
Thomas Spura 729573e
            monitor(socket, addr.c_str(), events);
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
        void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            int rc = zmq_socket_monitor(socket.ptr, addr_, events);
Thomas Spura 4c7a024
            if (rc != 0)
Thomas Spura 4c7a024
                throw error_t ();
Thomas Spura 4c7a024
Thomas Spura 4c7a024
            socketPtr = socket.ptr;
Thomas Spura 4c7a024
            void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
Thomas Spura 4c7a024
            assert (s);
Thomas Spura 4c7a024
Thomas Spura 4c7a024
            rc = zmq_connect (s, addr_);
Thomas Spura 4c7a024
            assert (rc == 0);
Thomas Spura 4c7a024
            
Thomas Spura 4c7a024
            on_monitor_started();
Thomas Spura 4c7a024
            
Thomas Spura 4c7a024
            while (true) {
Thomas Spura 4c7a024
                zmq_msg_t eventMsg;
Thomas Spura 4c7a024
                zmq_msg_init (&eventMsg);
Thomas Spura eb3bd66
                rc = zmq_msg_recv (&eventMsg, s, 0);
Thomas Spura 4c7a024
                if (rc == -1 && zmq_errno() == ETERM)
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                assert (rc != -1);
Thomas Spura 4c7a024
#if ZMQ_VERSION_MAJOR >= 4
Thomas Spura 4c7a024
                const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg));
Thomas Spura 4c7a024
                zmq_event_t msgEvent;
Thomas Spura 4c7a024
                memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t);
Thomas Spura 4c7a024
                memcpy(&msgEvent.value, data, sizeof(int32_t));
Thomas Spura 4c7a024
                zmq_event_t* event = &msgEvent;
Thomas Spura 4c7a024
#else
Thomas Spura 4c7a024
                zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg));
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
                
Thomas Spura 4c7a024
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
Thomas Spura 4c7a024
                zmq_msg_t addrMsg;
Thomas Spura 4c7a024
                zmq_msg_init (&addrMsg);
Thomas Spura eb3bd66
                rc = zmq_msg_recv (&addrMsg, s, 0);
Thomas Spura 4c7a024
                if (rc == -1 && zmq_errno() == ETERM)
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                assert (rc != -1);
Thomas Spura 4c7a024
                const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
Thomas Spura 4c7a024
                std::string address(str, str + zmq_msg_size(&addrMsg));
Thomas Spura 4c7a024
                zmq_msg_close (&addrMsg);
Thomas Spura 4c7a024
#else
Thomas Spura 4c7a024
                // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
Thomas Spura 4c7a024
                std::string address = event->data.connected.addr;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_EVENT_MONITOR_STOPPED
Thomas Spura 4c7a024
                if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
                switch (event->event) {
Thomas Spura 4c7a024
                case ZMQ_EVENT_CONNECTED:
Thomas Spura 4c7a024
                    on_event_connected(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_CONNECT_DELAYED:
Thomas Spura 4c7a024
                    on_event_connect_delayed(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_CONNECT_RETRIED:
Thomas Spura 4c7a024
                    on_event_connect_retried(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_LISTENING:
Thomas Spura 4c7a024
                    on_event_listening(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_BIND_FAILED:
Thomas Spura 4c7a024
                    on_event_bind_failed(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_ACCEPTED:
Thomas Spura 4c7a024
                    on_event_accepted(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_ACCEPT_FAILED:
Thomas Spura 4c7a024
                    on_event_accept_failed(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_CLOSED:
Thomas Spura 4c7a024
                    on_event_closed(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_CLOSE_FAILED:
Thomas Spura 4c7a024
                    on_event_close_failed(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                case ZMQ_EVENT_DISCONNECTED:
Thomas Spura 4c7a024
                    on_event_disconnected(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                default:
Thomas Spura 4c7a024
                    on_event_unknown(*event, address.c_str());
Thomas Spura 4c7a024
                    break;
Thomas Spura 4c7a024
                }
Thomas Spura 4c7a024
                zmq_msg_close (&eventMsg);
Thomas Spura 4c7a024
            }
Thomas Spura 4c7a024
            zmq_close (s);
Thomas Spura 4c7a024
            socketPtr = NULL;
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_EVENT_MONITOR_STOPPED
Thomas Spura 4c7a024
        void abort()
Thomas Spura 4c7a024
        {
Thomas Spura 4c7a024
            if (socketPtr)
Thomas Spura 4c7a024
                zmq_socket_monitor(socketPtr, NULL, 0);
Thomas Spura 4c7a024
        }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
        virtual void on_monitor_started() {}
Thomas Spura 4c7a024
        virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
        virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
Thomas Spura 4c7a024
    private:
Thomas Spura 4c7a024
        void* socketPtr;
Thomas Spura 4c7a024
    };
Thomas Spura 4c7a024
}
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#endif