commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 33/46: zeromq: Timeout needs to be in milli


From: git
Subject: [Commit-gnuradio] [gnuradio] 33/46: zeromq: Timeout needs to be in milliseconds for zmq 3.0, close sockets correctly, cleanup
Date: Fri, 16 May 2014 19:37:16 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 84444acb7a239626d11c4d06f7b6342c18180e0c
Author: Johannes Schmitz <address@hidden>
Date:   Fri May 9 16:24:21 2014 +0200

    zeromq: Timeout needs to be in milliseconds for zmq 3.0, close sockets
    correctly, cleanup
---
 gr-zeromq/examples/zeromq_pushpull.grc          | 12 ++++---
 gr-zeromq/examples/zeromq_reqrep.grc            | 16 ++++++---
 gr-zeromq/grc/zeromq_pull_source.xml            | 10 +++---
 gr-zeromq/grc/zeromq_push_sink.xml              | 10 +++---
 gr-zeromq/grc/zeromq_rep_sink.xml               | 10 +++---
 gr-zeromq/grc/zeromq_req_source.xml             | 10 +++---
 gr-zeromq/include/gnuradio/zeromq/pull_source.h |  2 +-
 gr-zeromq/include/gnuradio/zeromq/push_sink.h   |  2 +-
 gr-zeromq/include/gnuradio/zeromq/rep_sink.h    |  2 +-
 gr-zeromq/include/gnuradio/zeromq/req_source.h  |  2 +-
 gr-zeromq/lib/pub_sink_impl.cc                  |  4 ++-
 gr-zeromq/lib/pub_sink_impl.h                   |  4 +--
 gr-zeromq/lib/pull_source_impl.cc               | 11 +++---
 gr-zeromq/lib/pull_source_impl.h                |  2 +-
 gr-zeromq/lib/push_sink_impl.cc                 | 14 ++++----
 gr-zeromq/lib/push_sink_impl.h                  |  6 ++--
 gr-zeromq/lib/rep_sink_impl.cc                  | 47 +++++++++++++------------
 gr-zeromq/lib/rep_sink_impl.h                   |  2 +-
 gr-zeromq/lib/req_source_impl.cc                | 14 ++++----
 gr-zeromq/lib/req_source_impl.h                 |  2 +-
 gr-zeromq/python/zeromq/qa_zeromq_pushpull.py   |  7 ++--
 gr-zeromq/python/zeromq/qa_zeromq_reqrep.py     |  7 ++--
 22 files changed, 107 insertions(+), 89 deletions(-)

diff --git a/gr-zeromq/examples/zeromq_pushpull.grc 
b/gr-zeromq/examples/zeromq_pushpull.grc
index 0cff673..7b3146c 100644
--- a/gr-zeromq/examples/zeromq_pushpull.grc
+++ b/gr-zeromq/examples/zeromq_pushpull.grc
@@ -1,6 +1,6 @@
 <?xml version='1.0' encoding='ASCII'?>
 <flow_graph>
-  <timestamp>Wed May  7 12:06:31 2014</timestamp>
+  <timestamp>Fri May  9 15:00:28 2014</timestamp>
   <block>
     <key>options</key>
     <param>
@@ -382,7 +382,7 @@
     </param>
     <param>
       <key>timeout</key>
-      <value>0.1</value>
+      <value>100</value>
     </param>
     <param>
       <key>affinity</key>
@@ -425,7 +425,11 @@
     </param>
     <param>
       <key>address</key>
-      <value>tcp://*:5555</value>
+      <value>tcp://127.0.0.1:5555</value>
+    </param>
+    <param>
+      <key>timeout</key>
+      <value>100</value>
     </param>
     <param>
       <key>blocking</key>
@@ -437,7 +441,7 @@
     </param>
     <param>
       <key>_coordinate</key>
-      <value>(752, 96)</value>
+      <value>(751, 96)</value>
     </param>
     <param>
       <key>_rotation</key>
diff --git a/gr-zeromq/examples/zeromq_reqrep.grc 
b/gr-zeromq/examples/zeromq_reqrep.grc
index 2951acd..63bb053 100644
--- a/gr-zeromq/examples/zeromq_reqrep.grc
+++ b/gr-zeromq/examples/zeromq_reqrep.grc
@@ -1,6 +1,6 @@
 <?xml version='1.0' encoding='ASCII'?>
 <flow_graph>
-  <timestamp>Wed May  7 12:00:20 2014</timestamp>
+  <timestamp>Fri May  9 15:00:53 2014</timestamp>
   <block>
     <key>options</key>
     <param>
@@ -378,11 +378,11 @@
     </param>
     <param>
       <key>address</key>
-      <value>tcp://*:5555</value>
+      <value>tcp://127.0.0.1:5555</value>
     </param>
     <param>
       <key>timeout</key>
-      <value>0.1</value>
+      <value>100</value>
     </param>
     <param>
       <key>blocking</key>
@@ -421,7 +421,15 @@
     </param>
     <param>
       <key>address</key>
-      <value>tcp://localhost:5555</value>
+      <value>tcp://127.0.0.1:5555</value>
+    </param>
+    <param>
+      <key>timeout</key>
+      <value>100</value>
+    </param>
+    <param>
+      <key>blocking</key>
+      <value>True</value>
     </param>
     <param>
       <key>affinity</key>
diff --git a/gr-zeromq/grc/zeromq_pull_source.xml 
b/gr-zeromq/grc/zeromq_pull_source.xml
index 72625ee..f00fb57 100644
--- a/gr-zeromq/grc/zeromq_pull_source.xml
+++ b/gr-zeromq/grc/zeromq_pull_source.xml
@@ -38,10 +38,10 @@
   </param>
 
   <param>
-               <name>Vec Length</name>
-               <key>vlen</key>
-               <value>1</value>
-               <type>int</type>
+    <name>Vec Length</name>
+    <key>vlen</key>
+    <value>1</value>
+    <type>int</type>
   </param>
 
   <param>
@@ -51,7 +51,7 @@
   </param>
 
   <param>
-    <name>Timeout (sec)</name>
+    <name>Timeout (msec)</name>
     <key>timeout</key>
     <value>0.1</value>
     <type>float</type>
diff --git a/gr-zeromq/grc/zeromq_push_sink.xml 
b/gr-zeromq/grc/zeromq_push_sink.xml
index cde36d1..df901c8 100644
--- a/gr-zeromq/grc/zeromq_push_sink.xml
+++ b/gr-zeromq/grc/zeromq_push_sink.xml
@@ -38,10 +38,10 @@
   </param>
 
   <param>
-               <name>Vec Length</name>
-               <key>vlen</key>
-               <value>1</value>
-               <type>int</type>
+    <name>Vec Length</name>
+    <key>vlen</key>
+    <value>1</value>
+    <type>int</type>
   </param>
  
   <param>
@@ -51,7 +51,7 @@
   </param>
 
   <param>
-    <name>Timeout (sec)</name>
+    <name>Timeout (msec)</name>
     <key>timeout</key>
     <value>0.1</value>
     <type>float</type>
diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml 
b/gr-zeromq/grc/zeromq_rep_sink.xml
index 21fe8c1..293305c 100644
--- a/gr-zeromq/grc/zeromq_rep_sink.xml
+++ b/gr-zeromq/grc/zeromq_rep_sink.xml
@@ -38,10 +38,10 @@
   </param>
 
   <param>
-               <name>Vec Length</name>
-               <key>vlen</key>
-               <value>1</value>
-               <type>int</type>
+    <name>Vec Length</name>
+    <key>vlen</key>
+    <value>1</value>
+    <type>int</type>
   </param>
 
   <param>
@@ -51,7 +51,7 @@
   </param>
 
   <param>
-    <name>Timeout (sec)</name>
+    <name>Timeout (msec)</name>
     <key>timeout</key>
     <value>0.1</value>
     <type>float</type>
diff --git a/gr-zeromq/grc/zeromq_req_source.xml 
b/gr-zeromq/grc/zeromq_req_source.xml
index ea2084a..6da400b 100644
--- a/gr-zeromq/grc/zeromq_req_source.xml
+++ b/gr-zeromq/grc/zeromq_req_source.xml
@@ -38,10 +38,10 @@
   </param>
 
   <param>
-               <name>Vec Length</name>
-               <key>vlen</key>
-               <value>1</value>
-               <type>int</type>
+    <name>Vec Length</name>
+    <key>vlen</key>
+    <value>1</value>
+    <type>int</type>
   </param>
 
   <param>
@@ -51,7 +51,7 @@
   </param>
 
   <param>
-    <name>Timeout (sec)</name>
+    <name>Timeout (msec)</name>
     <key>timeout</key>
     <value>0.1</value>
     <type>float</type>
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h 
b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
index 4306931..3a0c943 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
@@ -51,7 +51,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index 53db71b..8fe849c 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
        * \param blocking Indicate whether blocking sends should be used, 
default true.
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1, bool blocking=true);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool blocking=true);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h 
b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 374607e..667612a 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -54,7 +54,7 @@ namespace gr {
        * \param blocking Indicate whether blocking sends should be used, 
default true.
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1, bool blocking=true);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool blocking=true);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h 
b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index 0f7c44d..31bd693 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -52,7 +52,7 @@ namespace gr {
        * \param timeout  Receive timeout in seconds, default is 100ms, 1us 
increments
        *
        */
-      static sptr make(size_t itemsize, size_t vlen, char *address, float 
timeout=0.1, bool blocking=true);
+      static sptr make(size_t itemsize, size_t vlen, char *address, int 
timeout=100, bool blocking=true);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 9cd115d..3a86819 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -34,7 +34,7 @@ namespace gr {
     pub_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking)
     {
       return gnuradio::get_initial_sptr
-       (new pub_sink_impl(itemsize, vlen, address, blocking));
+        (new pub_sink_impl(itemsize, vlen, address, blocking));
     }
 
     pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, 
bool blocking)
@@ -52,6 +52,8 @@ namespace gr {
 
     pub_sink_impl::~pub_sink_impl()
     {
+      d_socket->close();
+      d_context->close();
       delete d_socket;
       delete d_context;
     }
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 8e6fac7..ad0419a 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -43,8 +43,8 @@ namespace gr {
       ~pub_sink_impl();
 
       int work(int noutput_items,
-              gr_vector_const_void_star &input_items,
-              gr_vector_void_star &output_items);
+               gr_vector_const_void_star &input_items,
+               gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/pull_source_impl.cc 
b/gr-zeromq/lib/pull_source_impl.cc
index 2498eee..2ee6ad0 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -31,19 +31,18 @@ namespace gr {
   namespace zeromq {
 
     pull_source::sptr
-    pull_source::make(size_t itemsize, size_t vlen, char *address, float 
timeout)
+    pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
     {
       return gnuradio::get_initial_sptr
         (new pull_source_impl(itemsize, vlen, address, timeout));
     }
 
-    pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char 
*address, float timeout)
+    pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout)
       : gr::sync_block("pull_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
     {
-      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
       int time = 0;
@@ -56,6 +55,8 @@ namespace gr {
      */
     pull_source_impl::~pull_source_impl()
     {
+      d_socket->close();
+      d_context->close();
       delete d_socket;
       delete d_context;
     }
@@ -89,7 +90,7 @@ namespace gr {
         }
       }
       else {
-       return 0; // FIXME: someday when the scheduler does all the poll/selects
+        return 0; // FIXME: someday when the scheduler does all the 
poll/selects
       }
     }
 
diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h
index 5140dc5..e69de81 100644
--- a/gr-zeromq/lib/pull_source_impl.h
+++ b/gr-zeromq/lib/pull_source_impl.h
@@ -39,7 +39,7 @@ namespace gr {
       zmq::socket_t   *d_socket;
 
     public:
-      pull_source_impl(size_t itemsize, size_t vlen, char *address, float 
timeout);
+      pull_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout);
       ~pull_source_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 9d92c51..2df3a6f 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -31,20 +31,18 @@ namespace gr {
   namespace zeromq {
 
     push_sink::sptr
-    push_sink::make(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking)
+    push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool blocking)
     {
       return gnuradio::get_initial_sptr
         (new push_sink_impl(itemsize, vlen, address, timeout, blocking));
     }
 
-    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, float timeout, bool blocking)
+    push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool blocking)
       : gr::sync_block("push_sink",
                        gr::io_signature::make(1, 1, itemsize * vlen),
                        gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen)
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_blocking(blocking)
     {
-      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
-      d_blocking = blocking;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
       int time = 0;
@@ -54,8 +52,10 @@ namespace gr {
 
     push_sink_impl::~push_sink_impl()
     {
-      delete(d_socket);
-      delete(d_context);
+      d_socket->close();
+      d_context->close();
+      delete d_socket;
+      delete d_context;
     }
 
     int
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index e1a9051..2ff9bc5 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -40,12 +40,12 @@ namespace gr {
       zmq::socket_t   *d_socket;
 
     public:
-      push_sink_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
+      push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool blocking);
       ~push_sink_impl();
 
       int work(int noutput_items,
-              gr_vector_const_void_star &input_items,
-              gr_vector_void_star &output_items);
+               gr_vector_const_void_star &input_items,
+               gr_vector_void_star &output_items);
     };
 
   } // namespace zeromq
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index d3d8f81..35efffa 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -31,19 +31,18 @@ namespace gr {
   namespace zeromq {
 
     rep_sink::sptr
-    rep_sink::make(size_t itemsize, size_t vlen, char *address, float timeout, 
bool blocking)
+    rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool blocking)
     {
       return gnuradio::get_initial_sptr
         (new rep_sink_impl(itemsize, vlen, address, timeout, blocking));
     }
 
-    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
float timeout, bool blocking)
+    rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, 
int timeout, bool blocking)
       : gr::sync_block("rep_sink",
-                      gr::io_signature::make(1, 1, itemsize * vlen),
-                      gr::io_signature::make(0, 0, 0)),
-        d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking)
+                       gr::io_signature::make(1, 1, itemsize * vlen),
+                       gr::io_signature::make(0, 0, 0)),
+        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_blocking(blocking)
     {
-      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
       int time = 0;
@@ -53,6 +52,8 @@ namespace gr {
 
     rep_sink_impl::~rep_sink_impl()
     {
+      d_socket->close();
+      d_context->close();
       delete d_socket;
       delete d_context;
     }
@@ -69,26 +70,26 @@ namespace gr {
 
       //  If we got a reply, process
       if (items[0].revents & ZMQ_POLLIN) {
-       // receive data request
-       zmq::message_t request;
-       d_socket->recv(&request);
-       int req_output_items = *(static_cast<int*>(request.data()));
+        // receive data request
+        zmq::message_t request;
+        d_socket->recv(&request);
+        int req_output_items = *(static_cast<int*>(request.data()));
 
-       // create message copy and send
-       if (noutput_items < req_output_items) {
-         zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
-         memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
-         d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+        // create message copy and send
+        if (noutput_items < req_output_items) {
+          zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
+          memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
+          d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
 
-         return noutput_items;
-       }
-       else {
-         zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
-         memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
-         d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+          return noutput_items;
+        }
+        else {
+          zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
+          memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
+          d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
 
-         return req_output_items;
-       }
+          return req_output_items;
+        }
       }
 
       return 0;
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index 6e10a89..500af7a 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -40,7 +40,7 @@ namespace gr {
       zmq::socket_t   *d_socket;
 
     public:
-      rep_sink_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
+      rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, 
bool blocking);
       ~rep_sink_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 84cbcc7..121d2e3 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -31,19 +31,18 @@ namespace gr {
   namespace zeromq {
 
     req_source::sptr
-    req_source::make(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking)
+    req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, 
bool blocking)
     {
       return gnuradio::get_initial_sptr
         (new req_source_impl(itemsize, vlen, address, timeout, blocking));
     }
 
-    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address, float timeout, bool blocking)
+    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char 
*address, int timeout, bool blocking)
       : gr::sync_block("req_source",
-                      gr::io_signature::make(0, 0, 0),
-                      gr::io_signature::make(1, 1, itemsize * vlen)),
+                       gr::io_signature::make(0, 0, 0),
+                       gr::io_signature::make(1, 1, itemsize * vlen)),
         d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), 
d_blocking(blocking)
     {
-      d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
       d_context = new zmq::context_t(1);
       d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
       int time = 0;
@@ -53,6 +52,8 @@ namespace gr {
 
     req_source_impl::~req_source_impl()
     {
+      d_socket->close();
+      d_context->close();
       delete d_socket;
       delete d_context;
     }
@@ -65,7 +66,7 @@ namespace gr {
       char *out = (char*)output_items[0];
 
       zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, 0);
+      zmq::poll (&itemsout[0], 1, d_timeout);
 
       //  If we got a reply, process
       if (itemsout[0].revents & ZMQ_POLLOUT) {
@@ -73,7 +74,6 @@ namespace gr {
         zmq::message_t request(sizeof(int));
         memcpy ((void *) request.data (), &noutput_items, sizeof(int));
         d_socket->send(request, d_blocking ? 0 : ZMQ_NOBLOCK);
-
       }
 
       zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index 754f208..ab844ef 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -40,7 +40,7 @@ namespace gr {
       zmq::socket_t   *d_socket;
 
     public:
-      req_source_impl(size_t itemsize, size_t vlen, char *address, float 
timeout, bool blocking);
+      req_source_impl(size_t itemsize, size_t vlen, char *address, int 
timeout, bool blocking);
       ~req_source_impl();
 
       int work(int noutput_items,
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py 
b/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py
index 86b85d9..637edf8 100755
--- a/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_pushpull.py
@@ -31,10 +31,11 @@ class qa_zeromq_pushpull (gr_unittest.TestCase):
         self.tb = None
 
     def test_001_t (self):
-        src_data = [1,2,3,4,5,6,7,8,9]*100
+        print "test_001_t"
+        src_data = [1,2,3,4,5,6,7,8,9,0]*100
         src = blocks.vector_source_c(src_data, False, 1)
-        zeromq_push_sink = zeromq.push_sink(gr.sizeof_gr_complex, 1, 
"tcp://127.0.0.1:5555", True)
-        zeromq_pull_source = zeromq.pull_source(gr.sizeof_gr_complex, 1, 
"tcp://127.0.0.1:5555", 0.1)
+        zeromq_push_sink = zeromq.push_sink(gr.sizeof_gr_complex, 10, 
"tcp://127.0.0.1:5555", True)
+        zeromq_pull_source = zeromq.pull_source(gr.sizeof_gr_complex, 10, 
"tcp://127.0.0.1:5555", 0, True)
         sink = blocks.vector_sink_c()
         self.tb.connect(src, zeromq_push_sink)
         self.tb.connect(zeromq_pull_source, sink)
diff --git a/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py 
b/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py
index 85e3121..a577d90 100755
--- a/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py
+++ b/gr-zeromq/python/zeromq/qa_zeromq_reqrep.py
@@ -34,10 +34,11 @@ class qa_zeromq_reqrep (gr_unittest.TestCase):
         self.tb = None
 
     def test_001_t (self):
-        src_data = [1,2,3,4,5,6,7,8,9]*100
+        print "test_001_t"
+        src_data = [1,2,3,4,5,6,7,8,9,0]*100
         src = blocks.vector_source_c(src_data, False, 1)
-        zeromq_rep_sink = zeromq.rep_sink(gr.sizeof_gr_complex, 1, 
"tcp://127.0.0.1:5555", 0.1, True)
-        zeromq_req_source = zeromq.req_source(gr.sizeof_gr_complex, 1, 
"tcp://127.0.0.1:5555")
+        zeromq_rep_sink = zeromq.rep_sink(gr.sizeof_gr_complex, 10, 
"tcp://127.0.0.1:5555", 0, False)
+        zeromq_req_source = zeromq.req_source(gr.sizeof_gr_complex, 10, 
"tcp://127.0.0.1:5555", 0, False)
         sink = blocks.vector_sink_c()
         self.tb.connect(src, zeromq_rep_sink)
         self.tb.connect(zeromq_req_source, sink)



reply via email to

[Prev in Thread] Current Thread [Next in Thread]