commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 01/03: blocks: refactored stream_mux to be


From: git
Subject: [Commit-gnuradio] [gnuradio] 01/03: blocks: refactored stream_mux to be more flexible with buffer sizes
Date: Mon, 31 Mar 2014 18:35:03 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 1da534610a1e1ed0e7d613d39ff47485bb8b7bb6
Author: Martin Braun <address@hidden>
Date:   Tue Mar 25 21:32:13 2014 +0100

    blocks: refactored stream_mux to be more flexible with buffer sizes
---
 gr-blocks/lib/stream_mux_impl.cc         | 115 ++++++++++++++-----------------
 gr-blocks/lib/stream_mux_impl.h          |   6 +-
 gr-blocks/python/blocks/qa_stream_mux.py |  16 ++++-
 3 files changed, 69 insertions(+), 68 deletions(-)

diff --git a/gr-blocks/lib/stream_mux_impl.cc b/gr-blocks/lib/stream_mux_impl.cc
index 1e42c25..698cf89 100644
--- a/gr-blocks/lib/stream_mux_impl.cc
+++ b/gr-blocks/lib/stream_mux_impl.cc
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2012 Free Software Foundation, Inc.
+ * Copyright 2012,2014 Free Software Foundation, Inc.
  *
  * This file is part of GNU Radio
  *
@@ -26,10 +26,8 @@
 
 #include "stream_mux_impl.h"
 #include <gnuradio/io_signature.h>
-#include <string.h>
-#include <cstdio>
-
-#define VERBOSE 0
+#include <boost/foreach.hpp>
+#include <cstring>
 
 namespace gr {
   namespace blocks {
@@ -48,8 +46,11 @@ namespace gr {
        d_residual(0),
        d_lengths(lengths)
     {
-      if(d_lengths[d_stream] == 0) {
-       increment_stream();
+      while (d_lengths[d_stream] == 0) {
+        d_stream++;
+        if (d_stream == d_lengths.size()) {
+          throw std::invalid_argument("At least one size must be non-zero.");
+        }
       }
       d_residual = d_lengths[d_stream];
     }
@@ -58,69 +59,57 @@ namespace gr {
     stream_mux_impl::forecast(int noutput_items, gr_vector_int 
&ninput_items_required)
     {
       unsigned ninputs = ninput_items_required.size ();
-      for (unsigned i = 0; i < ninputs; i++)
-       ninput_items_required[i] = (d_lengths[i] == 0 ? 0 : 1);
-    }
-
-    void 
-    stream_mux_impl::increment_stream()
-    {
-      do {
-       d_stream = (d_stream+1) % d_lengths.size();
-      } while(d_lengths[d_stream] == 0);
-      
-      d_residual = d_lengths[d_stream];
+      for (unsigned i = 0; i < ninputs; i++) {
+       // Only active inputs *need* items, for the rest, it would just be nice
+       ninput_items_required[i] = (d_stream == i ? 1 : 0);
+      }
     }
 
 
     int
     stream_mux_impl::general_work(int noutput_items,
-                                  gr_vector_int &ninput_items,
-                                  gr_vector_const_void_star &input_items,
-                                  gr_vector_void_star &output_items)
-    {
+          gr_vector_int &ninput_items,
+          gr_vector_const_void_star &input_items,
+          gr_vector_void_star &output_items
+    ){
       char *out = (char *) output_items[0];
       const char *in;
-      int out_index = 0;
-      std::vector<int> input_index(d_lengths.size(), 0);
-      
-      if(VERBOSE) {
-       printf("mux: nouput_items: %d   d_stream: %d\n", noutput_items, 
d_stream);
-       for(size_t i = 0; i < d_lengths.size(); i++)
-         printf("\tninput_items[%zu]: %d\n", i, ninput_items[i]);
-      }
-      
-      while (1) {
-       int r = std::min(noutput_items - out_index,
-                        std::min(d_residual,
-                                 ninput_items[d_stream] - 
input_index[d_stream]));
-       if(VERBOSE) {
-         printf("mux: r=%d\n", r);
-         printf("\tnoutput_items - out_index: %d\n",
-                noutput_items - out_index);
-         printf("\td_residual: %d\n",
-                d_residual);
-         printf("\tninput_items[d_stream] - input_index[d_stream]: %d\n",
-                ninput_items[d_stream] - input_index[d_stream]);
-       }
-       
-       if(r <= 0) {
-         return out_index;
-       }
-       
-       in = (const char *) input_items[d_stream] + 
input_index[d_stream]*d_itemsize;
-       
-       memcpy(&out[out_index*d_itemsize], in, r*d_itemsize);
-       out_index += r;
-       input_index[d_stream] += r;
-       d_residual -= r;
-       
-       consume(d_stream, r);
-       
-       if(d_residual == 0) {
-         increment_stream();
-       }
+      int out_index = 0; // Items written
+      gr_vector_int input_index(d_lengths.size(), 0); // Items read
+
+      while (out_index < noutput_items) {
+        if (ninput_items[d_stream] <= input_index[d_stream]) {
+          break;
+        }
+        int space_left_in_buffers = std::min(
+              noutput_items - out_index, // Space left in output buffer
+              ninput_items[d_stream] - input_index[d_stream] // Space left in 
input buffer
+        );
+        int items_to_copy = std::min(
+            space_left_in_buffers,
+            d_residual
+        );
+        in = (const char *) input_items[d_stream] + 
input_index[d_stream]*d_itemsize;
+        memcpy(&out[out_index*d_itemsize], in, items_to_copy*d_itemsize);
+        out_index += items_to_copy;
+        input_index[d_stream] += items_to_copy;
+        d_residual -= items_to_copy;
+        if (d_residual == 0) {
+         do { // Skip all those inputs with zero length
+           d_stream = (d_stream+1) % d_lengths.size();
+         } while (d_lengths[d_stream] == 0);
+          d_residual = d_lengths[d_stream];
+        } else {
+          break;
+        }
+      } // while
+
+      for (size_t i = 0; i < input_index.size(); i++) {
+       consume((int) i, input_index[i]);
       }
-    }
+
+      return out_index;
+    } /* work */
+
   } /* namespace blocks */
 } /* namespace gr */
diff --git a/gr-blocks/lib/stream_mux_impl.h b/gr-blocks/lib/stream_mux_impl.h
index 328eb07..67be938 100644
--- a/gr-blocks/lib/stream_mux_impl.h
+++ b/gr-blocks/lib/stream_mux_impl.h
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2012 Free Software Foundation, Inc.
+ * Copyright 2012,2014 Free Software Foundation, Inc.
  *
  * This file is part of GNU Radio
  *
@@ -36,10 +36,8 @@ namespace gr {
       int d_residual;           // number if items left to put into current 
stream
       gr_vector_int d_lengths;  // number if items to pack per stream
 
-      void increment_stream();
-
       void forecast(int noutput_items, gr_vector_int &ninput_items_required);
-      
+
     public:
       stream_mux_impl(size_t itemsize, const std::vector<int> &lengths);
 
diff --git a/gr-blocks/python/blocks/qa_stream_mux.py 
b/gr-blocks/python/blocks/qa_stream_mux.py
index 7abbced..00e32e9 100755
--- a/gr-blocks/python/blocks/qa_stream_mux.py
+++ b/gr-blocks/python/blocks/qa_stream_mux.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 #
-# Copyright 2004,2005,2007,2010,2012,2013 Free Software Foundation, Inc.
+# Copyright 2004,2005,2007,2010,2012,2013,2014 Free Software Foundation, Inc.
 #
 # This file is part of GNU Radio
 #
@@ -167,5 +167,19 @@ class test_stream_mux (gr_unittest.TestCase):
 
         self.assertEqual (exp_data, result_data)
 
+    def test_largeN_ff(self):
+        stream_sizes = [3, 8191]
+        r1 = (1,) * stream_sizes[0]
+        r2 = (2,) * stream_sizes[1]
+        v0 = blocks.vector_source_f(r1, repeat=False)
+        v1 = blocks.vector_source_f(r2, repeat=False)
+        mux = blocks.stream_mux(gr.sizeof_float, stream_sizes)
+        dst = blocks.vector_sink_f ()
+        self.tb.connect (v0, (mux,0))
+        self.tb.connect (v1, (mux,1))
+        self.tb.connect (mux, dst)
+        self.tb.run ()
+        self.assertEqual (r1 + r2, dst.data())
+
 if __name__ == '__main__':
     gr_unittest.run(test_stream_mux, "test_stream_mux.xml")



reply via email to

[Prev in Thread] Current Thread [Next in Thread]