[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 01/03: blocks: refactored stream_mux to be
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 01/03: blocks: refactored stream_mux to be more flexible with buffer sizes |
Date: |
Mon, 31 Mar 2014 18:35:03 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 1da534610a1e1ed0e7d613d39ff47485bb8b7bb6
Author: Martin Braun <address@hidden>
Date: Tue Mar 25 21:32:13 2014 +0100
blocks: refactored stream_mux to be more flexible with buffer sizes
---
gr-blocks/lib/stream_mux_impl.cc | 115 ++++++++++++++-----------------
gr-blocks/lib/stream_mux_impl.h | 6 +-
gr-blocks/python/blocks/qa_stream_mux.py | 16 ++++-
3 files changed, 69 insertions(+), 68 deletions(-)
diff --git a/gr-blocks/lib/stream_mux_impl.cc b/gr-blocks/lib/stream_mux_impl.cc
index 1e42c25..698cf89 100644
--- a/gr-blocks/lib/stream_mux_impl.cc
+++ b/gr-blocks/lib/stream_mux_impl.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2012 Free Software Foundation, Inc.
+ * Copyright 2012,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -26,10 +26,8 @@
#include "stream_mux_impl.h"
#include <gnuradio/io_signature.h>
-#include <string.h>
-#include <cstdio>
-
-#define VERBOSE 0
+#include <boost/foreach.hpp>
+#include <cstring>
namespace gr {
namespace blocks {
@@ -48,8 +46,11 @@ namespace gr {
d_residual(0),
d_lengths(lengths)
{
- if(d_lengths[d_stream] == 0) {
- increment_stream();
+ while (d_lengths[d_stream] == 0) {
+ d_stream++;
+ if (d_stream == d_lengths.size()) {
+ throw std::invalid_argument("At least one size must be non-zero.");
+ }
}
d_residual = d_lengths[d_stream];
}
@@ -58,69 +59,57 @@ namespace gr {
stream_mux_impl::forecast(int noutput_items, gr_vector_int
&ninput_items_required)
{
unsigned ninputs = ninput_items_required.size ();
- for (unsigned i = 0; i < ninputs; i++)
- ninput_items_required[i] = (d_lengths[i] == 0 ? 0 : 1);
- }
-
- void
- stream_mux_impl::increment_stream()
- {
- do {
- d_stream = (d_stream+1) % d_lengths.size();
- } while(d_lengths[d_stream] == 0);
-
- d_residual = d_lengths[d_stream];
+ for (unsigned i = 0; i < ninputs; i++) {
+ // Only active inputs *need* items, for the rest, it would just be nice
+ ninput_items_required[i] = (d_stream == i ? 1 : 0);
+ }
}
int
stream_mux_impl::general_work(int noutput_items,
- gr_vector_int &ninput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items
+ ){
char *out = (char *) output_items[0];
const char *in;
- int out_index = 0;
- std::vector<int> input_index(d_lengths.size(), 0);
-
- if(VERBOSE) {
- printf("mux: nouput_items: %d d_stream: %d\n", noutput_items,
d_stream);
- for(size_t i = 0; i < d_lengths.size(); i++)
- printf("\tninput_items[%zu]: %d\n", i, ninput_items[i]);
- }
-
- while (1) {
- int r = std::min(noutput_items - out_index,
- std::min(d_residual,
- ninput_items[d_stream] -
input_index[d_stream]));
- if(VERBOSE) {
- printf("mux: r=%d\n", r);
- printf("\tnoutput_items - out_index: %d\n",
- noutput_items - out_index);
- printf("\td_residual: %d\n",
- d_residual);
- printf("\tninput_items[d_stream] - input_index[d_stream]: %d\n",
- ninput_items[d_stream] - input_index[d_stream]);
- }
-
- if(r <= 0) {
- return out_index;
- }
-
- in = (const char *) input_items[d_stream] +
input_index[d_stream]*d_itemsize;
-
- memcpy(&out[out_index*d_itemsize], in, r*d_itemsize);
- out_index += r;
- input_index[d_stream] += r;
- d_residual -= r;
-
- consume(d_stream, r);
-
- if(d_residual == 0) {
- increment_stream();
- }
+ int out_index = 0; // Items written
+ gr_vector_int input_index(d_lengths.size(), 0); // Items read
+
+ while (out_index < noutput_items) {
+ if (ninput_items[d_stream] <= input_index[d_stream]) {
+ break;
+ }
+ int space_left_in_buffers = std::min(
+ noutput_items - out_index, // Space left in output buffer
+ ninput_items[d_stream] - input_index[d_stream] // Space left in
input buffer
+ );
+ int items_to_copy = std::min(
+ space_left_in_buffers,
+ d_residual
+ );
+ in = (const char *) input_items[d_stream] +
input_index[d_stream]*d_itemsize;
+ memcpy(&out[out_index*d_itemsize], in, items_to_copy*d_itemsize);
+ out_index += items_to_copy;
+ input_index[d_stream] += items_to_copy;
+ d_residual -= items_to_copy;
+ if (d_residual == 0) {
+ do { // Skip all those inputs with zero length
+ d_stream = (d_stream+1) % d_lengths.size();
+ } while (d_lengths[d_stream] == 0);
+ d_residual = d_lengths[d_stream];
+ } else {
+ break;
+ }
+ } // while
+
+ for (size_t i = 0; i < input_index.size(); i++) {
+ consume((int) i, input_index[i]);
}
- }
+
+ return out_index;
+ } /* work */
+
} /* namespace blocks */
} /* namespace gr */
diff --git a/gr-blocks/lib/stream_mux_impl.h b/gr-blocks/lib/stream_mux_impl.h
index 328eb07..67be938 100644
--- a/gr-blocks/lib/stream_mux_impl.h
+++ b/gr-blocks/lib/stream_mux_impl.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2012 Free Software Foundation, Inc.
+ * Copyright 2012,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -36,10 +36,8 @@ namespace gr {
int d_residual; // number if items left to put into current
stream
gr_vector_int d_lengths; // number if items to pack per stream
- void increment_stream();
-
void forecast(int noutput_items, gr_vector_int &ninput_items_required);
-
+
public:
stream_mux_impl(size_t itemsize, const std::vector<int> &lengths);
diff --git a/gr-blocks/python/blocks/qa_stream_mux.py
b/gr-blocks/python/blocks/qa_stream_mux.py
index 7abbced..00e32e9 100755
--- a/gr-blocks/python/blocks/qa_stream_mux.py
+++ b/gr-blocks/python/blocks/qa_stream_mux.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2004,2005,2007,2010,2012,2013 Free Software Foundation, Inc.
+# Copyright 2004,2005,2007,2010,2012,2013,2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -167,5 +167,19 @@ class test_stream_mux (gr_unittest.TestCase):
self.assertEqual (exp_data, result_data)
+ def test_largeN_ff(self):
+ stream_sizes = [3, 8191]
+ r1 = (1,) * stream_sizes[0]
+ r2 = (2,) * stream_sizes[1]
+ v0 = blocks.vector_source_f(r1, repeat=False)
+ v1 = blocks.vector_source_f(r2, repeat=False)
+ mux = blocks.stream_mux(gr.sizeof_float, stream_sizes)
+ dst = blocks.vector_sink_f ()
+ self.tb.connect (v0, (mux,0))
+ self.tb.connect (v1, (mux,1))
+ self.tb.connect (mux, dst)
+ self.tb.run ()
+ self.assertEqual (r1 + r2, dst.data())
+
if __name__ == '__main__':
gr_unittest.run(test_stream_mux, "test_stream_mux.xml")