commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 31/46: zeromq: Add missing timeout and bloc


From: git
Subject: [Commit-gnuradio] [gnuradio] 31/46: zeromq: Add missing timeout and blocking parameters and polling
Date: Fri, 16 May 2014 19:37:16 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 1545615b3a2fdf233d56d88ea235f300f7267813
Author: Johannes Schmitz <address@hidden>
Date:   Thu May 8 16:50:22 2014 +0200

    zeromq: Add missing timeout and blocking parameters and polling
---
 gr-zeromq/include/gnuradio/zeromq/push_sink.h  |  2 +-
 gr-zeromq/include/gnuradio/zeromq/req_source.h |  2 +-
 gr-zeromq/lib/push_sink_impl.cc                | 27 ++++++++++++++++++--------
 gr-zeromq/lib/push_sink_impl.h                 |  3 ++-
 gr-zeromq/lib/rep_sink_impl.h                  |  2 +-
 gr-zeromq/lib/req_source_impl.cc               | 16 ++++++++-------
 gr-zeromq/lib/req_source_impl.h                |  4 +++-
 7 files changed, 36 insertions(+), 20 deletions(-)

diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index 46ad6a4..53db71b 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
        * \param blocking Indicate whether blocking sends should be used, 
default true.
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, bool 
blocking=true);
+      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1, bool blocking=true);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h 
b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index 5fc3682..0f7c44d 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -52,7 +52,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address);
+      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1, bool blocking=true);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 5acc3c5..9d92c51 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -31,18 +31,19 @@ namespace gr {
   namespace zeromq {
 
     push_sink::sptr
-    push_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking)
+    push_sink::make(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking)
     {
       return gnuradio::get_initial_sptr
-        (new push_sink_impl(itemsize, vlen, address, blocking));
+        (new push_sink_impl(itemsize, vlen, address, timeout, blocking));
     }
 
-    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, bool blocking)
+    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, float timeout, bool blocking)
       : gr::sync_block("push_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
         d_itemsize(itemsize), d_vlen(vlen)
     {
+      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
       d_blocking = blocking;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
@@ -64,11 +65,21 @@ namespace gr {
     {
       const char *in = (const char *) input_items[0];
 
-      // create message copy and send
-      zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
-      memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-      d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
-      return noutput_items;
+      zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+      zmq::poll (&itemsout[0], 1, d_timeout);
+
+      //  If we got a reply, process
+      if (itemsout[0].revents & ZMQ_POLLOUT) {
+        // create message copy and send
+        zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
+        memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
+        d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+
+        return noutput_items;
+      }
+      else {
+        return 0;
+      }
     }
 
   } /* namespace zeromq */
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index a34bb28..e1a9051 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -34,12 +34,13 @@ namespace gr {
     private:
       size_t          d_itemsize;
       size_t          d_vlen;
+      float           d_timeout;
       bool            d_blocking;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
 
     public:
-      push_sink_impl(size_t itemsize, size_t vlen, char *address, bool 
blocking);
+      push_sink_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
       ~push_sink_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index a933d94..6e10a89 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -35,9 +35,9 @@ namespace gr {
       size_t          d_itemsize;
       size_t          d_vlen;
       int             d_timeout;
+      bool            d_blocking;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
-      bool            d_blocking;
 
     public:
       rep_sink_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 937b594..84cbcc7 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -31,18 +31,19 @@ namespace gr {
   namespace zeromq {
 
     req_source::sptr
-    req_source::make(size_t itemsize, size_t vlen, char *address)
+    req_source::make(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking)
     {
       return gnuradio::get_initial_sptr
-        (new req_source_impl(itemsize, vlen, address));
+        (new req_source_impl(itemsize, vlen, address, timeout, blocking));
     }
 
-    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address)
+    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address, float timeout, bool blocking)
       : gr::sync_block("req_source",
                       gr::io_signature::make(0, 0, 0),
                       gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_blocking(blocking)
     {
+      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
       int time = 0;
@@ -68,14 +69,15 @@ namespace gr {
 
       //  If we got a reply, process
       if (itemsout[0].revents & ZMQ_POLLOUT) {
-        // Request data, FIXME non portable
+        // Request data, FIXME non portable?
         zmq::message_t request(sizeof(int));
         memcpy ((void *) request.data (), &noutput_items, sizeof(int));
-        d_socket->send(request);
+        d_socket->send(request, d_blocking ? 0 : ZMQ_NOBLOCK);
+
       }
 
       zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll (&itemsin[0], 1, 0);
+      zmq::poll (&itemsin[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (itemsin[0].revents & ZMQ_POLLIN) {
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index c906e91..754f208 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -34,11 +34,13 @@ namespace gr {
     private:
       size_t          d_itemsize;
       size_t          d_vlen;
+      int             d_timeout;
+      bool            d_blocking;
       zmq::context_t  *d_context;
       zmq::socket_t   *d_socket;
 
     public:
-      req_source_impl(size_t itemsize, size_t vlen, char *address);
+      req_source_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
       ~req_source_impl();
 
       int work(int noutput_items,



reply via email to

[Prev in Thread] Current Thread [Next in Thread]