Thomas Spura 4c7a024
/*
5652a73
    Copyright (c) 2016-2017 ZeroMQ community
Thomas Spura 4c7a024
    Copyright (c) 2009-2011 250bpm s.r.o.
Thomas Spura 4c7a024
    Copyright (c) 2011 Botond Ballo
Thomas Spura 4c7a024
    Copyright (c) 2007-2009 iMatix Corporation
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    Permission is hereby granted, free of charge, to any person obtaining a copy
Thomas Spura 4c7a024
    of this software and associated documentation files (the "Software"), to
Thomas Spura 4c7a024
    deal in the Software without restriction, including without limitation the
Thomas Spura 4c7a024
    rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
Thomas Spura 4c7a024
    sell copies of the Software, and to permit persons to whom the Software is
Thomas Spura 4c7a024
    furnished to do so, subject to the following conditions:
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    The above copyright notice and this permission notice shall be included in
Thomas Spura 4c7a024
    all copies or substantial portions of the Software.
Thomas Spura 4c7a024
Thomas Spura 4c7a024
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
Thomas Spura 4c7a024
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
Thomas Spura 4c7a024
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Thomas Spura 4c7a024
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
Thomas Spura 4c7a024
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
Thomas Spura 4c7a024
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
Thomas Spura 4c7a024
    IN THE SOFTWARE.
Thomas Spura 4c7a024
*/
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifndef __ZMQ_HPP_INCLUDED__
Thomas Spura 4c7a024
#define __ZMQ_HPP_INCLUDED__
Thomas Spura 4c7a024
5652a73
#if (__cplusplus >= 201402L)
5652a73
#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]]
5652a73
#elif defined(_MSC_VER)
5652a73
#define ZMQ_DEPRECATED(msg) __declspec(deprecated(msg))
5652a73
#elif defined(__GNUC__)
5652a73
#define ZMQ_DEPRECATED(msg) __attribute__((deprecated(msg)))
5652a73
#endif
5652a73
Thomas Spura be57c1a
#if (__cplusplus >= 201103L)
5652a73
#define ZMQ_CPP11
5652a73
#define ZMQ_NOTHROW noexcept
5652a73
#define ZMQ_EXPLICIT explicit
5652a73
#elif (defined(_MSC_VER) && (_MSC_VER >= 1900))
5652a73
#define ZMQ_CPP11
5652a73
#define ZMQ_NOTHROW noexcept
5652a73
#define ZMQ_EXPLICIT explicit
Thomas Spura 729573e
#else
5652a73
#define ZMQ_CPP03
5652a73
#define ZMQ_NOTHROW
5652a73
#define ZMQ_EXPLICIT
Thomas Spura 729573e
#endif
Thomas Spura 729573e
Thomas Spura 4c7a024
#include <zmq.h>
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#include <cassert>
Thomas Spura 4c7a024
#include <cstring>
5652a73
5652a73
#include <algorithm>
Thomas Spura 4c7a024
#include <exception>
5652a73
#include <iomanip>
Thomas Spura 729573e
#include <iterator>
5652a73
#include <sstream>
5652a73
#include <string>
5652a73
#include <vector>
5652a73
5652a73
/*  Version macros for compile-time API version detection                     */
5652a73
#define CPPZMQ_VERSION_MAJOR 4
5652a73
#define CPPZMQ_VERSION_MINOR 3
5652a73
#define CPPZMQ_VERSION_PATCH 0
5652a73
5652a73
#define CPPZMQ_VERSION                                                              \
5652a73
    ZMQ_MAKE_VERSION(CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR,                    \
5652a73
                     CPPZMQ_VERSION_PATCH)
Thomas Spura 729573e
Thomas Spura 729573e
#ifdef ZMQ_CPP11
Thomas Spura 729573e
#include <chrono>
Thomas Spura 729573e
#include <tuple>
5652a73
#include <functional>
5652a73
#include <unordered_map>
5652a73
#include <memory>
Thomas Spura 729573e
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
//  Detect whether the compiler supports C++11 rvalue references.
5652a73
#if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2))   \
5652a73
     && defined(__GXX_EXPERIMENTAL_CXX0X__))
5652a73
#define ZMQ_HAS_RVALUE_REFS
5652a73
#define ZMQ_DELETED_FUNCTION = delete
Thomas Spura 4c7a024
#elif defined(__clang__)
5652a73
#if __has_feature(cxx_rvalue_references)
5652a73
#define ZMQ_HAS_RVALUE_REFS
5652a73
#endif
5652a73
5652a73
#if __has_feature(cxx_deleted_functions)
5652a73
#define ZMQ_DELETED_FUNCTION = delete
5652a73
#else
5652a73
#define ZMQ_DELETED_FUNCTION
5652a73
#endif
Thomas Spura be57c1a
#elif defined(_MSC_VER) && (_MSC_VER >= 1900)
5652a73
#define ZMQ_HAS_RVALUE_REFS
5652a73
#define ZMQ_DELETED_FUNCTION = delete
Thomas Spura 4c7a024
#elif defined(_MSC_VER) && (_MSC_VER >= 1600)
5652a73
#define ZMQ_HAS_RVALUE_REFS
5652a73
#define ZMQ_DELETED_FUNCTION
Thomas Spura 4c7a024
#else
5652a73
#define ZMQ_DELETED_FUNCTION
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
Thomas Spura 4c7a024
#define ZMQ_NEW_MONITOR_EVENT_LAYOUT
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
Thomas Spura 4c7a024
#define ZMQ_HAS_PROXY_STEERABLE
Thomas Spura 4c7a024
/*  Socket event data  */
5652a73
typedef struct
5652a73
{
5652a73
    uint16_t event; // id of the event as bitfield
5652a73
    int32_t value;  // value is either error code, fd or reconnect interval
Thomas Spura 4c7a024
} zmq_event_t;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura eb3bd66
// Avoid using deprecated message receive function when possible
Thomas Spura eb3bd66
#if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
5652a73
#define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags)
Thomas Spura eb3bd66
#endif
Thomas Spura eb3bd66
Thomas Spura eb3bd66
Thomas Spura 4c7a024
// In order to prevent unused variable warnings when building in non-debug
Thomas Spura 4c7a024
// mode use this macro to make assertions.
Thomas Spura 4c7a024
#ifndef NDEBUG
5652a73
#define ZMQ_ASSERT(expression) assert(expression)
Thomas Spura 4c7a024
#else
5652a73
#define ZMQ_ASSERT(expression) (void) (expression)
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
namespace zmq
Thomas Spura 4c7a024
{
5652a73
typedef zmq_free_fn free_fn;
5652a73
typedef zmq_pollitem_t pollitem_t;
5652a73
5652a73
class error_t : public std::exception
5652a73
{
5652a73
  public:
5652a73
    error_t() : errnum(zmq_errno()) {}
5652a73
#ifdef ZMQ_CPP11
5652a73
    virtual const char *what() const noexcept { return zmq_strerror(errnum); }
5652a73
#else
5652a73
    virtual const char *what() const throw() { return zmq_strerror(errnum); }
5652a73
#endif
5652a73
    int num() const { return errnum; }
Thomas Spura 4c7a024
5652a73
  private:
5652a73
    int errnum;
5652a73
};
Thomas Spura 4c7a024
5652a73
inline int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1)
5652a73
{
5652a73
    int rc = zmq_poll(const_cast<zmq_pollitem_t *>(items_),
5652a73
                      static_cast<int>(nitems_), timeout_);
5652a73
    if (rc < 0)
5652a73
        throw error_t();
5652a73
    return rc;
5652a73
}
Thomas Spura 4c7a024
Thomas Spura be57c1a
#ifdef ZMQ_CPP11
5652a73
inline int
5652a73
poll(zmq_pollitem_t const *items, size_t nitems, std::chrono::milliseconds timeout)
5652a73
{
5652a73
    return poll(items, nitems, static_cast<long>(timeout.count()));
5652a73
}
5652a73
5652a73
inline int poll(std::vector<zmq_pollitem_t> const &items,
5652a73
                std::chrono::milliseconds timeout)
5652a73
{
5652a73
    return poll(items.data(), items.size(), static_cast<long>(timeout.count()));
5652a73
}
Thomas Spura 4c7a024
5652a73
inline int poll(std::vector<zmq_pollitem_t> const &items, long timeout_ = -1)
5652a73
{
5652a73
    return poll(items.data(), items.size(), timeout_);
5652a73
}
5652a73
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
5652a73
inline void proxy(void *frontend, void *backend, void *capture)
5652a73
{
5652a73
    int rc = zmq_proxy(frontend, backend, capture);
5652a73
    if (rc != 0)
5652a73
        throw error_t();
5652a73
}
5652a73
5652a73
#ifdef ZMQ_HAS_PROXY_STEERABLE
5652a73
inline void
5652a73
proxy_steerable(void *frontend, void *backend, void *capture, void *control)
5652a73
{
5652a73
    int rc = zmq_proxy_steerable(frontend, backend, capture, control);
5652a73
    if (rc != 0)
5652a73
        throw error_t();
5652a73
}
5652a73
#endif
5652a73
5652a73
inline void version(int *major_, int *minor_, int *patch_)
5652a73
{
5652a73
    zmq_version(major_, minor_, patch_);
5652a73
}
5652a73
5652a73
#ifdef ZMQ_CPP11
5652a73
inline std::tuple<int, int, int> version()
5652a73
{
5652a73
    std::tuple<int, int, int> v;
5652a73
    zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v));
5652a73
    return v;
5652a73
}
5652a73
#endif
5652a73
5652a73
class message_t
5652a73
{
5652a73
    friend class socket_t;
5652a73
5652a73
  public:
5652a73
    inline message_t()
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_msg_init(&msg;;
5652a73
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
5652a73
    inline explicit message_t(size_t size_)
Thomas Spura 729573e
    {
5652a73
        int rc = zmq_msg_init_size(&msg, size_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 729573e
    }
Thomas Spura 729573e
5652a73
    template<typename T> message_t(T first, T last) : msg()
Thomas Spura 729573e
    {
5652a73
        typedef typename std::iterator_traits<T>::difference_type size_type;
5652a73
        typedef typename std::iterator_traits<T>::value_type value_t;
5652a73
5652a73
        size_type const size_ = std::distance(first, last) * sizeof(value_t);
5652a73
        int const rc = zmq_msg_init_size(&msg, size_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        value_t *dest = data<value_t>();
5652a73
        while (first != last) {
5652a73
            *dest = *first;
5652a73
            ++dest;
5652a73
            ++first;
5652a73
        }
Thomas Spura 729573e
    }
Thomas Spura 729573e
5652a73
    inline message_t(const void *data_, size_t size_)
Thomas Spura 729573e
    {
5652a73
        int rc = zmq_msg_init_size(&msg, size_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        memcpy(data(), data_, size_);
5652a73
    }
5652a73
5652a73
    inline message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL)
5652a73
    {
5652a73
        int rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 729573e
    }
Thomas Spura 729573e
5652a73
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11)
5652a73
    template<typename T>
5652a73
    message_t(const T &msg_) : message_t(std::begin(msg_), std::end(msg_))
Thomas Spura 729573e
    {
Thomas Spura 729573e
    }
5652a73
#endif
5652a73
5652a73
#ifdef ZMQ_HAS_RVALUE_REFS
5652a73
    inline message_t(message_t &&rhs) : msg(rhs.msg)
5652a73
    {
5652a73
        int rc = zmq_msg_init(&rhs.msg);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 729573e
5652a73
    inline message_t &operator=(message_t &&rhs) ZMQ_NOTHROW
5652a73
    {
5652a73
        std::swap(msg, rhs.msg);
5652a73
        return *this;
5652a73
    }
5652a73
#endif
Thomas Spura 729573e
5652a73
    inline ~message_t() ZMQ_NOTHROW
5652a73
    {
5652a73
        int rc = zmq_msg_close(&msg;;
5652a73
        ZMQ_ASSERT(rc == 0);
5652a73
    }
Thomas Spura 729573e
5652a73
    inline void rebuild()
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_msg_close(&msg;;
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        rc = zmq_msg_init(&msg;;
Thomas Spura 4c7a024
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 4c7a024
    }
5652a73
5652a73
    inline void rebuild(size_t size_)
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_msg_close(&msg;;
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        rc = zmq_msg_init_size(&msg, size_);
Thomas Spura 4c7a024
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 4c7a024
    }
5652a73
5652a73
    inline void rebuild(const void *data_, size_t size_)
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_msg_close(&msg;;
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        rc = zmq_msg_init_size(&msg, size_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        memcpy(data(), data_, size_);
Thomas Spura 4c7a024
    }
Thomas Spura 4c7a024
5652a73
    inline void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL)
Thomas Spura 729573e
    {
5652a73
        int rc = zmq_msg_close(&msg;;
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
        rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
Thomas Spura 729573e
    }
Thomas Spura 729573e
5652a73
    inline void move(message_t const *msg_)
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_msg_move(&msg, const_cast<zmq_msg_t *>(&(msg_->msg)));
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void copy(message_t const *msg_)
5652a73
    {
5652a73
        int rc = zmq_msg_copy(&msg, const_cast<zmq_msg_t *>(&(msg_->msg)));
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline bool more() const ZMQ_NOTHROW
5652a73
    {
5652a73
        int rc = zmq_msg_more(const_cast<zmq_msg_t *>(&msg));
5652a73
        return rc != 0;
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void *data() ZMQ_NOTHROW { return zmq_msg_data(&msg;; }
Thomas Spura 4c7a024
5652a73
    inline const void *data() const ZMQ_NOTHROW
5652a73
    {
5652a73
        return zmq_msg_data(const_cast<zmq_msg_t *>(&msg));
5652a73
    }
Thomas Spura eb3bd66
5652a73
    inline size_t size() const ZMQ_NOTHROW
5652a73
    {
5652a73
        return zmq_msg_size(const_cast<zmq_msg_t *>(&msg));
5652a73
    }
Thomas Spura 729573e
5652a73
    template<typename T> T *data() ZMQ_NOTHROW { return static_cast<T *>(data()); }
Thomas Spura 4c7a024
5652a73
    template<typename T> T const *data() const ZMQ_NOTHROW
5652a73
    {
5652a73
        return static_cast<T const *>(data());
5652a73
    }
Thomas Spura 4c7a024
5652a73
    ZMQ_DEPRECATED("from 4.3.0, use operator== instead")
5652a73
    inline bool equal(const message_t *other) const ZMQ_NOTHROW
5652a73
    {
5652a73
        return *this == *other;
5652a73
    }
5652a73
5652a73
    inline bool operator==(const message_t &other) const ZMQ_NOTHROW
5652a73
    {
5652a73
        const size_t my_size = size();
5652a73
        return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size);
5652a73
    }
5652a73
5652a73
    inline bool operator!=(const message_t &other) const ZMQ_NOTHROW
5652a73
    {
5652a73
        return !(*this == other);
5652a73
    }
5652a73
5652a73
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
5652a73
    inline const char *gets(const char *property_)
5652a73
    {
5652a73
        const char *value = zmq_msg_gets(&msg, property_);
5652a73
        if (value == NULL)
5652a73
            throw error_t();
5652a73
        return value;
5652a73
    }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
5652a73
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
5652a73
    inline uint32_t routing_id() const
5652a73
    {
5652a73
        return zmq_msg_routing_id(const_cast<zmq_msg_t*>(&msg));
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void set_routing_id(uint32_t routing_id)
5652a73
    {
5652a73
        int rc = zmq_msg_set_routing_id(&msg, routing_id);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline const char* group() const
5652a73
    {
5652a73
        return zmq_msg_group(const_cast<zmq_msg_t*>(&msg));
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void set_group(const char* group)
5652a73
    {
5652a73
        int rc = zmq_msg_set_group(&msg, group);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
5652a73
#endif
Thomas Spura eb3bd66
5652a73
    /** Dump content to string. Ascii chars are readable, the rest is printed as hex.
5652a73
         *  Probably ridiculously slow.
5652a73
         */
5652a73
    inline std::string str() const
5652a73
    {
5652a73
        // Partly mutuated from the same method in zmq::multipart_t
5652a73
        std::stringstream os;
5652a73
5652a73
        const unsigned char *msg_data = this->data<unsigned char>();
5652a73
        unsigned char byte;
5652a73
        size_t size = this->size();
5652a73
        int is_ascii[2] = {0, 0};
5652a73
5652a73
        os << "zmq::message_t [size " << std::dec << std::setw(3)
5652a73
           << std::setfill('0') << size << "] (";
5652a73
        // Totally arbitrary
5652a73
        if (size >= 1000) {
5652a73
            os << "... too big to print)";
5652a73
        } else {
5652a73
            while (size--) {
5652a73
                byte = *msg_data++;
5652a73
5652a73
                is_ascii[1] = (byte >= 33 && byte < 127);
5652a73
                if (is_ascii[1] != is_ascii[0])
5652a73
                    os << " "; // Separate text/non text
5652a73
5652a73
                if (is_ascii[1]) {
5652a73
                    os << byte;
5652a73
                } else {
5652a73
                    os << std::hex << std::uppercase << std::setw(2)
5652a73
                       << std::setfill('0') << static_cast<short>(byte);
5652a73
                }
5652a73
                is_ascii[0] = is_ascii[1];
5652a73
            }
5652a73
            os << ")";
Thomas Spura 4c7a024
        }
5652a73
        return os.str();
5652a73
    }
Thomas Spura 4c7a024
5652a73
  private:
5652a73
    //  The underlying message
5652a73
    zmq_msg_t msg;
Thomas Spura 4c7a024
5652a73
    //  Disable implicit message copying, so that users won't use shared
5652a73
    //  messages (less efficient) without being aware of the fact.
5652a73
    message_t(const message_t &) ZMQ_DELETED_FUNCTION;
5652a73
    void operator=(const message_t &) ZMQ_DELETED_FUNCTION;
5652a73
};
Thomas Spura 4c7a024
5652a73
class context_t
5652a73
{
5652a73
    friend class socket_t;
Thomas Spura 4c7a024
5652a73
  public:
5652a73
    inline context_t()
5652a73
    {
5652a73
        ptr = zmq_ctx_new();
5652a73
        if (ptr == NULL)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
5652a73
    inline explicit context_t(int io_threads_,
5652a73
                              int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT)
5652a73
    {
5652a73
        ptr = zmq_ctx_new();
5652a73
        if (ptr == NULL)
5652a73
            throw error_t();
Thomas Spura 4c7a024
5652a73
        int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_);
5652a73
        ZMQ_ASSERT(rc == 0);
Thomas Spura 4c7a024
5652a73
        rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_);
5652a73
        ZMQ_ASSERT(rc == 0);
5652a73
    }
Thomas Spura 729573e
5652a73
#ifdef ZMQ_HAS_RVALUE_REFS
5652a73
    inline context_t(context_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = NULL; }
5652a73
    inline context_t &operator=(context_t &&rhs) ZMQ_NOTHROW
5652a73
    {
5652a73
        std::swap(ptr, rhs.ptr);
5652a73
        return *this;
5652a73
    }
5652a73
#endif
Thomas Spura 729573e
5652a73
    inline int setctxopt(int option_, int optval_)
5652a73
    {
5652a73
        int rc = zmq_ctx_set(ptr, option_, optval_);
5652a73
        ZMQ_ASSERT(rc == 0);
5652a73
        return rc;
5652a73
    }
5652a73
5652a73
    inline int getctxopt(int option_) { return zmq_ctx_get(ptr, option_); }
Thomas Spura 4c7a024
5652a73
    inline ~context_t() ZMQ_NOTHROW { close(); }
Thomas Spura 4c7a024
5652a73
    inline void close() ZMQ_NOTHROW
Thomas Spura 4c7a024
    {
5652a73
        if (ptr == NULL)
5652a73
            return;
Thomas Spura 4c7a024
5652a73
        int rc = zmq_ctx_destroy(ptr);
5652a73
        ZMQ_ASSERT(rc == 0);
5652a73
        ptr = NULL;
5652a73
    }
Thomas Spura 4c7a024
5652a73
    //  Be careful with this, it's probably only useful for
5652a73
    //  using the C api together with an existing C++ api.
5652a73
    //  Normally you should never need to use this.
5652a73
    inline ZMQ_EXPLICIT operator void *() ZMQ_NOTHROW { return ptr; }
Thomas Spura 4c7a024
5652a73
    inline ZMQ_EXPLICIT operator void const *() const ZMQ_NOTHROW { return ptr; }
Thomas Spura 4c7a024
5652a73
    inline operator bool() const ZMQ_NOTHROW { return ptr != NULL; }
Thomas Spura 4c7a024
5652a73
  private:
5652a73
    void *ptr;
5652a73
5652a73
    context_t(const context_t &) ZMQ_DELETED_FUNCTION;
5652a73
    void operator=(const context_t &) ZMQ_DELETED_FUNCTION;
5652a73
};
5652a73
5652a73
#ifdef ZMQ_CPP11
5652a73
enum class socket_type : int
5652a73
{
5652a73
    req = ZMQ_REQ,
5652a73
    rep = ZMQ_REP,
5652a73
    dealer = ZMQ_DEALER,
5652a73
    router = ZMQ_ROUTER,
5652a73
    pub = ZMQ_PUB,
5652a73
    sub = ZMQ_SUB,
5652a73
    xpub = ZMQ_XPUB,
5652a73
    xsub = ZMQ_XSUB,
5652a73
    push = ZMQ_PUSH,
5652a73
    pull = ZMQ_PULL,
5652a73
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
5652a73
    server = ZMQ_SERVER,
5652a73
    client = ZMQ_CLIENT,
5652a73
    radio = ZMQ_RADIO,
5652a73
    dish = ZMQ_DISH,
5652a73
#endif
5652a73
#if ZMQ_VERSION_MAJOR >= 4
5652a73
    stream = ZMQ_STREAM,
5652a73
#endif
5652a73
    pair = ZMQ_PAIR
5652a73
};
5652a73
#endif
5652a73
5652a73
class socket_t
5652a73
{
5652a73
    friend class monitor_t;
5652a73
5652a73
  public:
5652a73
    inline socket_t(context_t &context_, int type_) { init(context_, type_); }
5652a73
5652a73
#ifdef ZMQ_CPP11
5652a73
    inline socket_t(context_t &context_, socket_type type_)
5652a73
    {
5652a73
        init(context_, static_cast<int>(type_));
5652a73
    }
5652a73
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
5652a73
    inline socket_t(socket_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr), ctxptr(rhs.ctxptr)
5652a73
    {
5652a73
        rhs.ptr = NULL;
5652a73
        rhs.ctxptr = NULL;
5652a73
    }
5652a73
    inline socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW
5652a73
    {
5652a73
        std::swap(ptr, rhs.ptr);
5652a73
        return *this;
5652a73
    }
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
5652a73
    inline ~socket_t() ZMQ_NOTHROW { close(); }
Thomas Spura 4c7a024
5652a73
    inline operator void *() ZMQ_NOTHROW { return ptr; }
Thomas Spura be57c1a
5652a73
    inline operator void const *() const ZMQ_NOTHROW { return ptr; }
Thomas Spura 4c7a024
5652a73
    inline void close() ZMQ_NOTHROW
5652a73
    {
5652a73
        if (ptr == NULL)
5652a73
            // already closed
5652a73
            return;
5652a73
        int rc = zmq_close(ptr);
5652a73
        ZMQ_ASSERT(rc == 0);
5652a73
        ptr = 0;
5652a73
    }
Thomas Spura 4c7a024
5652a73
    template<typename T> void setsockopt(int option_, T const &optval)
5652a73
    {
5652a73
        setsockopt(option_, &optval, sizeof(T));
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void setsockopt(int option_, const void *optval_, size_t optvallen_)
Thomas Spura 4c7a024
    {
5652a73
        int rc = zmq_setsockopt(ptr, option_, optval_, optvallen_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void getsockopt(int option_, void *optval_, size_t *optvallen_) const
5652a73
    {
5652a73
        int rc = zmq_getsockopt(ptr, option_, optval_, optvallen_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    template<typename T> T getsockopt(int option_) const
5652a73
    {
5652a73
        T optval;
5652a73
        size_t optlen = sizeof(T);
5652a73
        getsockopt(option_, &optval, &optlen);
5652a73
        return optval;
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void bind(std::string const &addr) { bind(addr.c_str()); }
Thomas Spura 4c7a024
5652a73
    inline void bind(const char *addr_)
5652a73
    {
5652a73
        int rc = zmq_bind(ptr, addr_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 729573e
5652a73
    inline void unbind(std::string const &addr) { unbind(addr.c_str()); }
Thomas Spura 4c7a024
5652a73
    inline void unbind(const char *addr_)
5652a73
    {
5652a73
        int rc = zmq_unbind(ptr, addr_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void connect(std::string const &addr) { connect(addr.c_str()); }
Thomas Spura 729573e
5652a73
    inline void connect(const char *addr_)
5652a73
    {
5652a73
        int rc = zmq_connect(ptr, addr_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void disconnect(std::string const &addr) { disconnect(addr.c_str()); }
Thomas Spura 729573e
5652a73
    inline void disconnect(const char *addr_)
5652a73
    {
5652a73
        int rc = zmq_disconnect(ptr, addr_);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 729573e
5652a73
    inline bool connected() const ZMQ_NOTHROW { return (ptr != NULL); }
Thomas Spura 729573e
5652a73
    inline size_t send(const void *buf_, size_t len_, int flags_ = 0)
5652a73
    {
5652a73
        int nbytes = zmq_send(ptr, buf_, len_, flags_);
5652a73
        if (nbytes >= 0)
5652a73
            return (size_t) nbytes;
5652a73
        if (zmq_errno() == EAGAIN)
5652a73
            return 0;
5652a73
        throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline bool send(message_t &msg_, int flags_ = 0)
5652a73
    {
5652a73
        int nbytes = zmq_msg_send(&(msg_.msg), ptr, flags_);
5652a73
        if (nbytes >= 0)
5652a73
            return true;
5652a73
        if (zmq_errno() == EAGAIN)
5652a73
            return false;
5652a73
        throw error_t();
5652a73
    }
Thomas Spura 729573e
5652a73
    template<typename T> bool send(T first, T last, int flags_ = 0)
5652a73
    {
5652a73
        zmq::message_t msg(first, last);
5652a73
        return send(msg, flags_);
5652a73
    }
Thomas Spura 4c7a024
5652a73
#ifdef ZMQ_HAS_RVALUE_REFS
5652a73
    inline bool send(message_t &&msg_, int flags_ = 0) { return send(msg_, flags_); }
5652a73
#endif
Thomas Spura 729573e
5652a73
    inline size_t recv(void *buf_, size_t len_, int flags_ = 0)
5652a73
    {
5652a73
        int nbytes = zmq_recv(ptr, buf_, len_, flags_);
5652a73
        if (nbytes >= 0)
5652a73
            return (size_t) nbytes;
5652a73
        if (zmq_errno() == EAGAIN)
5652a73
            return 0;
5652a73
        throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline bool recv(message_t *msg_, int flags_ = 0)
5652a73
    {
5652a73
        int nbytes = zmq_msg_recv(&(msg_->msg), ptr, flags_);
5652a73
        if (nbytes >= 0)
5652a73
            return true;
5652a73
        if (zmq_errno() == EAGAIN)
5652a73
            return false;
5652a73
        throw error_t();
5652a73
    }
Thomas Spura 729573e
5652a73
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
5652a73
    inline void join(const char* group)
5652a73
    {
5652a73
        int rc = zmq_join(ptr, group);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    inline void leave(const char* group)
5652a73
    {
5652a73
        int rc = zmq_leave(ptr, group);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
    }
5652a73
#endif
Thomas Spura 4c7a024
5652a73
  private:
5652a73
    inline void init(context_t &context_, int type_)
5652a73
    {
5652a73
        ctxptr = context_.ptr;
5652a73
        ptr = zmq_socket(context_.ptr, type_);
5652a73
        if (ptr == NULL)
5652a73
            throw error_t();
5652a73
    }
5652a73
5652a73
    void *ptr;
5652a73
    void *ctxptr;
5652a73
5652a73
    socket_t(const socket_t &) ZMQ_DELETED_FUNCTION;
5652a73
    void operator=(const socket_t &) ZMQ_DELETED_FUNCTION;
5652a73
};
5652a73
5652a73
class monitor_t
5652a73
{
5652a73
  public:
5652a73
    monitor_t() : socketPtr(NULL), monitor_socket(NULL) {}
5652a73
5652a73
    virtual ~monitor_t()
5652a73
    {
5652a73
        if (socketPtr)
5652a73
            zmq_socket_monitor(socketPtr, NULL, 0);
5652a73
5652a73
        if (monitor_socket)
5652a73
            zmq_close(monitor_socket);
5652a73
    }
Thomas Spura 4c7a024
Thomas Spura 729573e
Thomas Spura 4c7a024
#ifdef ZMQ_HAS_RVALUE_REFS
5652a73
    monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : socketPtr(rhs.socketPtr),
5652a73
                                             monitor_socket(rhs.monitor_socket)
5652a73
    {
5652a73
        rhs.socketPtr = NULL;
5652a73
        rhs.monitor_socket = NULL;
5652a73
    }
5652a73
5652a73
    socket_t &operator=(socket_t &&rhs) ZMQ_DELETED_FUNCTION;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
5652a73
    void
5652a73
    monitor(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL)
5652a73
    {
5652a73
        monitor(socket, addr.c_str(), events);
5652a73
    }
5652a73
5652a73
    void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
5652a73
    {
5652a73
        init(socket, addr_, events);
5652a73
        while (true) {
5652a73
            check_event(-1);
Thomas Spura 729573e
        }
5652a73
    }
5652a73
5652a73
    void init(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL)
5652a73
    {
5652a73
        init(socket, addr.c_str(), events);
5652a73
    }
5652a73
5652a73
    void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
5652a73
    {
5652a73
        int rc = zmq_socket_monitor(socket.ptr, addr_, events);
5652a73
        if (rc != 0)
5652a73
            throw error_t();
5652a73
5652a73
        socketPtr = socket.ptr;
5652a73
        monitor_socket = zmq_socket(socket.ctxptr, ZMQ_PAIR);
5652a73
        assert(monitor_socket);
Thomas Spura 729573e
5652a73
        rc = zmq_connect(monitor_socket, addr_);
5652a73
        assert(rc == 0);
Thomas Spura 4c7a024
5652a73
        on_monitor_started();
5652a73
    }
Thomas Spura 4c7a024
5652a73
    bool check_event(int timeout = 0)
Thomas Spura 4c7a024
    {
5652a73
        assert(monitor_socket);
Thomas Spura 4c7a024
5652a73
        zmq_msg_t eventMsg;
5652a73
        zmq_msg_init(&eventMsg);
5652a73
5652a73
        zmq::pollitem_t items[] = {
5652a73
          {monitor_socket, 0, ZMQ_POLLIN, 0},
5652a73
        };
5652a73
5652a73
        zmq::poll(&items[0], 1, timeout);
5652a73
5652a73
        if (items[0].revents & ZMQ_POLLIN) {
5652a73
            int rc = zmq_msg_recv(&eventMsg, monitor_socket, 0);
5652a73
            if (rc == -1 && zmq_errno() == ETERM)
5652a73
                return false;
5652a73
            assert(rc != -1);
5652a73
5652a73
        } else {
5652a73
            zmq_msg_close(&eventMsg);
5652a73
            return false;
Thomas Spura 729573e
        }
Thomas Spura 729573e
Thomas Spura 4c7a024
#if ZMQ_VERSION_MAJOR >= 4
5652a73
        const char *data = static_cast<const char *>(zmq_msg_data(&eventMsg));
5652a73
        zmq_event_t msgEvent;
5652a73
        memcpy(&msgEvent.event, data, sizeof(uint16_t));
5652a73
        data += sizeof(uint16_t);
5652a73
        memcpy(&msgEvent.value, data, sizeof(int32_t));
5652a73
        zmq_event_t *event = &msgEvent;
Thomas Spura 4c7a024
#else
5652a73
        zmq_event_t *event = static_cast<zmq_event_t *>(zmq_msg_data(&eventMsg));
Thomas Spura 4c7a024
#endif
5652a73
Thomas Spura 4c7a024
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
5652a73
        zmq_msg_t addrMsg;
5652a73
        zmq_msg_init(&addrMsg);
5652a73
        int rc = zmq_msg_recv(&addrMsg, monitor_socket, 0);
5652a73
        if (rc == -1 && zmq_errno() == ETERM) {
5652a73
            zmq_msg_close(&eventMsg);
5652a73
            return false;
5652a73
        }
5652a73
5652a73
        assert(rc != -1);
5652a73
        const char *str = static_cast<const char *>(zmq_msg_data(&addrMsg));
5652a73
        std::string address(str, str + zmq_msg_size(&addrMsg));
5652a73
        zmq_msg_close(&addrMsg);
Thomas Spura 4c7a024
#else
5652a73
        // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
5652a73
        std::string address = event->data.connected.addr;
Thomas Spura 4c7a024
#endif
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_EVENT_MONITOR_STOPPED
5652a73
        if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
5652a73
            zmq_msg_close(&eventMsg);
5652a73
            return false;
5652a73
        }
5652a73
5652a73
#endif
5652a73
5652a73
        switch (event->event) {
5652a73
            case ZMQ_EVENT_CONNECTED:
5652a73
                on_event_connected(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_CONNECT_DELAYED:
5652a73
                on_event_connect_delayed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_CONNECT_RETRIED:
5652a73
                on_event_connect_retried(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_LISTENING:
5652a73
                on_event_listening(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_BIND_FAILED:
5652a73
                on_event_bind_failed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_ACCEPTED:
5652a73
                on_event_accepted(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_ACCEPT_FAILED:
5652a73
                on_event_accept_failed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_CLOSED:
5652a73
                on_event_closed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_CLOSE_FAILED:
5652a73
                on_event_close_failed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_DISCONNECTED:
5652a73
                on_event_disconnected(*event, address.c_str());
5652a73
                break;
5652a73
#ifdef ZMQ_BUILD_DRAFT_API
5652a73
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
5652a73
            case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
5652a73
                on_event_handshake_failed_no_detail(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
5652a73
                on_event_handshake_failed_protocol(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
5652a73
                on_event_handshake_failed_auth(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
5652a73
                on_event_handshake_succeeded(*event, address.c_str());
5652a73
                break;
5652a73
#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
5652a73
            case ZMQ_EVENT_HANDSHAKE_FAILED:
5652a73
                on_event_handshake_failed(*event, address.c_str());
5652a73
                break;
5652a73
            case ZMQ_EVENT_HANDSHAKE_SUCCEED:
5652a73
                on_event_handshake_succeed(*event, address.c_str());
5652a73
                break;
5652a73
#endif
5652a73
#endif
5652a73
            default:
5652a73
                on_event_unknown(*event, address.c_str());
5652a73
                break;
Thomas Spura 4c7a024
        }
5652a73
        zmq_msg_close(&eventMsg);
5652a73
5652a73
        return true;
5652a73
    }
Thomas Spura 4c7a024
Thomas Spura 4c7a024
#ifdef ZMQ_EVENT_MONITOR_STOPPED
5652a73
    void abort()
5652a73
    {
5652a73
        if (socketPtr)
5652a73
            zmq_socket_monitor(socketPtr, NULL, 0);
5652a73
5652a73
        socketPtr = NULL;
5652a73
    }
5652a73
#endif
5652a73
    virtual void on_monitor_started() {}
5652a73
    virtual void on_event_connected(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_connect_delayed(const zmq_event_t &event_,
5652a73
                                          const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_connect_retried(const zmq_event_t &event_,
5652a73
                                          const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_listening(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_closed(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
5652a73
    virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_,
5652a73
                                                     const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_,
5652a73
                                                    const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_handshake_failed_auth(const zmq_event_t &event_,
5652a73
                                                const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_handshake_succeeded(const zmq_event_t &event_,
5652a73
                                              const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
5652a73
    virtual void on_event_handshake_failed(const zmq_event_t &event_,
5652a73
                                           const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
    virtual void on_event_handshake_succeed(const zmq_event_t &event_,
5652a73
                                            const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
#endif
5652a73
    virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_)
5652a73
    {
5652a73
        (void) event_;
5652a73
        (void) addr_;
5652a73
    }
5652a73
5652a73
  private:
5652a73
    monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION;
5652a73
    void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;
5652a73
5652a73
    void *socketPtr;
5652a73
    void *monitor_socket;
5652a73
};
5652a73
5652a73
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
5652a73
template<typename T = void> class poller_t
5652a73
{
5652a73
  public:
5652a73
    void add(zmq::socket_t &socket, short events, T *user_data)
5652a73
    {
5652a73
        if (0
5652a73
            != zmq_poller_add(poller_ptr.get(), static_cast<void *>(socket),
5652a73
                              user_data, events)) {
5652a73
            throw error_t();
5652a73
        }
5652a73
    }
5652a73
5652a73
    void remove(zmq::socket_t &socket)
5652a73
    {
5652a73
        if (0 != zmq_poller_remove(poller_ptr.get(), static_cast<void *>(socket))) {
5652a73
            throw error_t();
Thomas Spura 4c7a024
        }
5652a73
    }
5652a73
5652a73
    void modify(zmq::socket_t &socket, short events)
5652a73
    {
5652a73
        if (0
5652a73
            != zmq_poller_modify(poller_ptr.get(), static_cast<void *>(socket),
5652a73
                                 events)) {
5652a73
            throw error_t();
5652a73
        }
5652a73
    }
5652a73
5652a73
    size_t wait_all(std::vector<zmq_poller_event_t> &poller_events,
5652a73
                    const std::chrono::microseconds timeout)
5652a73
    {
5652a73
        int rc = zmq_poller_wait_all(poller_ptr.get(), poller_events.data(),
5652a73
                                     static_cast<int>(poller_events.size()),
5652a73
                                     static_cast<long>(timeout.count()));
5652a73
        if (rc > 0)
5652a73
            return static_cast<size_t>(rc);
5652a73
5652a73
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
5652a73
        if (zmq_errno() == EAGAIN)
5652a73
#else
5652a73
        if (zmq_errno() == ETIMEDOUT)
Thomas Spura 4c7a024
#endif
5652a73
            return 0;
5652a73
5652a73
        throw error_t();
5652a73
    }
5652a73
5652a73
  private:
5652a73
    std::unique_ptr<void, std::function<void(void *)>> poller_ptr{
5652a73
      []() {
5652a73
          auto poller_new = zmq_poller_new();
5652a73
          if (poller_new)
5652a73
              return poller_new;
5652a73
          throw error_t();
5652a73
      }(),
5652a73
      [](void *ptr) {
5652a73
          int rc = zmq_poller_destroy(&ptr);
5652a73
          ZMQ_ASSERT(rc == 0);
5652a73
      }};
5652a73
};
5652a73
#endif //  defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
5652a73
5652a73
inline std::ostream &operator<<(std::ostream &os, const message_t &msg)
5652a73
{
5652a73
    return os << msg.str();
Thomas Spura 4c7a024
}
Thomas Spura 4c7a024
5652a73
} // namespace zmq
5652a73
5652a73
#endif // __ZMQ_HPP_INCLUDED__