[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 06/12: digital: updated some QAs to use pro
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 06/12: digital: updated some QAs to use proper tsb functions |
Date: |
Fri, 23 May 2014 17:35:56 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 95ae36c7ad1fd228e7ca943c7d2a2b49053ae818
Author: Martin Braun <address@hidden>
Date: Thu May 8 13:33:07 2014 +0200
digital: updated some QAs to use proper tsb functions
---
gr-digital/python/digital/qa_crc32_bb.py | 111 +++++++-------
.../digital/qa_ofdm_carrier_allocator_cvc.py | 111 ++++++--------
gr-digital/python/digital/qa_ofdm_chanest_vcvc.py | 7 +-
.../python/digital/qa_ofdm_cyclic_prefixer.py | 17 +--
.../python/digital/qa_ofdm_frame_equalizer_vcvc.py | 162 ++++++++++-----------
.../python/digital/qa_ofdm_serializer_vcc.py | 120 +++++----------
.../python/digital/qa_packet_headergenerator_bb.py | 105 ++++---------
7 files changed, 265 insertions(+), 368 deletions(-)
diff --git a/gr-digital/python/digital/qa_crc32_bb.py
b/gr-digital/python/digital/qa_crc32_bb.py
index 71ba81d..bca19cd 100755
--- a/gr-digital/python/digital/qa_crc32_bb.py
+++ b/gr-digital/python/digital/qa_crc32_bb.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -26,6 +26,7 @@ class qa_crc32_bb (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "length"
def tearDown (self):
self.tb = None
@@ -33,36 +34,37 @@ class qa_crc32_bb (gr_unittest.TestCase):
def test_001_crc_len (self):
""" Make sure the output of a CRC set is 4 bytes longer than the
input. """
data = range(16)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, sink)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data),
self.tsb_key),
+ crc,
+ sink
+ )
self.tb.run()
# Check that the packets before crc_check are 4 bytes longer that the
input.
- self.assertEqual(len(data)+4, len(sink.data()))
+ self.assertEqual(len(data)+4, len(sink.data()[0]))
def test_002_crc_equal (self):
""" Go through CRC set / CRC check and make sure the output
is the same as the input. """
data = (0, 1, 2, 3, 4, 5, 6, 7, 8)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- crc_check = digital.crc32_bb(True, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, crc_check, sink)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ crc_check = digital.crc32_bb(True, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data),
self.tsb_key),
+ crc,
+ crc_check,
+ sink
+ )
self.tb.run()
# Check that the packets after crc_check are the same as input.
- self.assertEqual(data, sink.data())
+ self.assertEqual(data, sink.data()[0])
def test_003_crc_correct_lentag (self):
tag_name = "length"
@@ -88,12 +90,19 @@ class qa_crc32_bb (gr_unittest.TestCase):
testtag3.offset = len(packets)-1
testtag3.key = pmt.string_to_symbol("tag3")
testtag3.value = pmt.from_long(0)
- src = blocks.vector_source_b(packets, False, 1, (tag1, tag2, testtag1,
testtag2, testtag3))
- crc = digital.crc32_bb(False, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, sink)
+ src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2,
testtag3))
+ crc = digital.crc32_bb(False, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len,
self.tsb_key),
+ crc,
+ sink
+ )
self.tb.run()
- self.assertEqual(len(sink.data()), 2*(pack_len+4))
+ self.assertEqual(len(sink.data()), 2)
+ self.assertEqual(len(sink.data()[0]), (pack_len+4))
+ self.assertEqual(len(sink.data()[1]), (pack_len+4))
correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19}
tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
for tag in sink.tags():
@@ -101,45 +110,49 @@ class qa_crc32_bb (gr_unittest.TestCase):
if key in correct_offsets.keys():
tags_found[key] = True
self.assertEqual(correct_offsets[key], tag.offset)
- if key == tag_name:
- self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4)
self.assertTrue(all(tags_found.values()))
def test_004_fail (self):
""" Corrupt the data and make sure it fails CRC test. """
data = (0, 1, 2, 3, 4, 5, 6, 7)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- crc_check = digital.crc32_bb(True, tag_name)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ crc_check = digital.crc32_bb(True, self.tsb_key)
corruptor = blocks.add_const_bb(1)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, corruptor, crc_check, sink)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data),
self.tsb_key),
+ crc,
+ corruptor,
+ crc_check,
+ sink
+ )
self.tb.run()
# crc_check will drop invalid packets
self.assertEqual(len(sink.data()), 0)
def test_005_tag_propagation (self):
""" Make sure tags on the CRC aren't lost. """
- data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 2, 67, 225, 188)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
+ # Data with precalculated CRC
+ data = (
+ 0, 1, 2, 3, 4, 5, 6, 7, 8,
+ 2, 67, 225, 188
+ )
testtag = gr.tag_t()
testtag.offset = len(data)-1
testtag.key = pmt.string_to_symbol('tag1')
testtag.value = pmt.from_long(0)
- src = blocks.vector_source_b(data, False, 1, (tag, testtag))
- crc_check = digital.crc32_bb(True, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc_check, sink)
+ src = blocks.vector_source_b(data, False, 1, (testtag,))
+ crc_check = digital.crc32_bb(True, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data),
self.tsb_key),
+ crc_check,
+ sink
+ )
self.tb.run()
self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if
pmt.symbol_to_string(tag.key) == 'tag1'])
diff --git a/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py
b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py
index b1732fa..befb15a 100755
--- a/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py
+++ b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -26,13 +26,14 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "ts_last"
def tearDown (self):
self.tb = None
def test_001_t (self):
"""
- pretty simple (the carrier allocation is not a practical OFDM
configuration!)
+ pretty simple (the carrier allocation here is not a practical OFDM
configuration!)
"""
fft_len = 6
tx_symbols = (1, 2, 3)
@@ -43,21 +44,21 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
sync_word = (range(fft_len),)
expected_result = tuple(sync_word[0] + [1j, 0, 0, 1, 2, 3])
# ^ DC carrier
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_symbols))
- src = blocks.vector_source_c(tx_symbols, False, 1, (tag,))
+ src = blocks.vector_source_c(tx_symbols, False, 1)
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols, sync_word,
- tag_name)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, alloc, sink)
- self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
+ self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1,
len(tx_symbols), self.tsb_key),
+ alloc,
+ sink
+ )
+ self.tb.run()
+ self.assertEqual(sink.data()[0], expected_result)
def test_001_t2 (self):
"""
@@ -71,21 +72,18 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
pilot_symbols = ((1j,),)
expected_result = (1j, 0, 1, 2, 3)
# ^ DC carrier
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_symbols))
- src = blocks.vector_source_c(tx_symbols, False, 1, (tag,))
- alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
- occupied_carriers,
- pilot_carriers,
- pilot_symbols, (),
- tag_name)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, alloc, sink)
+ src = blocks.vector_source_c(tx_symbols, False, 1)
+ alloc = digital.ofdm_carrier_allocator_cvc(
+ fft_len,
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols, (),
+ self.tsb_key
+ )
+ sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols),
self.tsb_key), alloc, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(sink.data()[0], expected_result)
def test_002_t (self):
"""
@@ -97,21 +95,21 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
occupied_carriers = ((-1, 1, 2),)
pilot_carriers = ((3,),)
expected_result = (1j, 0, 1, 0, 2, 3)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_symbols))
- src = blocks.vector_source_c(tx_symbols, False, 1, (tag,))
+ src = blocks.vector_source_c(tx_symbols, False, 1)
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols, (),
- tag_name)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, alloc, sink)
+ self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(fft_len)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1,
len(tx_symbols), self.tsb_key),
+ alloc,
+ sink
+ )
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(sink.data()[0], expected_result)
def test_002_t (self):
"""
@@ -124,11 +122,6 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
occupied_carriers = ((-1, 1, 2),)
pilot_carriers = ((3,),)
expected_result = sync_word + (1j, 0, 1, 0, 2, 3) + (1j, 0, 4, 0, 5, 6)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_symbols))
special_tag1 = gr.tag_t()
special_tag1.offset = 0
special_tag1.key = pmt.string_to_symbol("spam")
@@ -139,7 +132,7 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
special_tag2.value = pmt.to_pmt(42)
src = blocks.vector_source_c(
tx_symbols, False, 1,
- (tag, special_tag1, special_tag2)
+ (special_tag1, special_tag2)
)
alloc = digital.ofdm_carrier_allocator_cvc(
fft_len,
@@ -147,16 +140,15 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
pilot_carriers,
pilot_symbols,
sync_words=(sync_word,),
- len_tag_key=tag_name
+ len_tag_key=self.tsb_key
)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, alloc, sink)
+ sink = blocks.tsb_vector_sink_c(fft_len)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols),
self.tsb_key), alloc, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(sink.data()[0], expected_result)
tags = [gr.tag_to_python(x) for x in sink.tags()]
tags = sorted([(x.offset, x.key, x.value) for x in tags])
tags_expected = [
- (0, 'len', 3),
(0, 'spam', 23),
(2, 'eggs', 42),
]
@@ -180,15 +172,6 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11,
12, 0,
0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0,
0)
fft_len = 16
- tag_name = "len"
- tag1 = gr.tag_t()
- tag1.offset = 0
- tag1.key = pmt.string_to_symbol(tag_name)
- tag1.value = pmt.from_long(len(tx_symbols))
- tag2 = gr.tag_t()
- tag2.offset = len(tx_symbols)
- tag2.key = pmt.string_to_symbol(tag_name)
- tag2.value = pmt.from_long(len(tx_symbols))
testtag1 = gr.tag_t()
testtag1.offset = 0
testtag1.key = pmt.string_to_symbol('tag1')
@@ -205,18 +188,17 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
testtag4.offset = 2*len(tx_symbols)-1 # Last OFDM symbol of packet 2
testtag4.key = pmt.string_to_symbol('tag4')
testtag4.value = pmt.from_long(0)
- src = blocks.vector_source_c(tx_symbols * 2, False, 1,
- (tag1, tag2, testtag1, testtag2,
testtag3, testtag4))
+ src = blocks.vector_source_c(tx_symbols * 2, False, 1, (testtag1,
testtag2, testtag3, testtag4))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols, (),
- tag_name,
+ self.tsb_key,
False)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, alloc, sink)
+ sink = blocks.tsb_vector_sink_c(fft_len)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols),
self.tsb_key), alloc, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result * 2)
+ self.assertEqual(sink.data()[0], expected_result)
tags_found = {'tag1': False, 'tag2': False, 'tag3': False, 'tag4':
False}
correct_offsets = {'tag1': 0, 'tag2': 1, 'tag3': 3, 'tag4': 5}
for tag in sink.tags():
@@ -224,9 +206,6 @@ class qa_digital_carrier_allocator_cvc
(gr_unittest.TestCase):
if key in tags_found.keys():
tags_found[key] = True
self.assertEqual(correct_offsets[key], tag.offset)
- if key == tag_name:
- self.assertTrue(tag.offset == 0 or tag.offset == 3)
- self.assertTrue(pmt.to_long(tag.value) == 3)
self.assertTrue(all(tags_found.values()))
diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
index c365cf8..d63b65d 100755
--- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -22,7 +22,6 @@
import sys
import numpy
import random
-
import numpy
from gnuradio import gr, gr_unittest, blocks, analog, digital
@@ -41,7 +40,7 @@ def rand_range(min_val, max_val):
return random.random() * (max_val - min_val) + min_val
-class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
+class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
@@ -284,5 +283,5 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
if __name__ == '__main__':
- gr_unittest.run(qa_ofdm_sync_eqinit_vcvc, "qa_ofdm_sync_eqinit_vcvc.xml")
+ gr_unittest.run(qa_ofdm_chanest_vcvc, "qa_ofdm_chanest_vcvc.xml")
diff --git a/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py
b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py
index 5cb9fae..ecc1c42 100755
--- a/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py
+++ b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2007,2010,2011,2013 Free Software Foundation, Inc.
+# Copyright 2007,2010,2011,2013,2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -62,27 +62,22 @@ class test_ofdm_cyclic_prefixer (gr_unittest.TestCase):
" With tags and a 2-sample rolloff "
fft_len = 8
cp_len = 2
- tag_name = "length"
+ tag_name = "ts_last"
expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2
7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0/2)
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(2)
tag2 = gr.tag_t()
tag2.offset = 1
tag2.key = pmt.string_to_symbol("random_tag")
tag2.value = pmt.from_long(42)
- src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len,
(tag, tag2))
+ src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len,
(tag2,))
cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, 2,
tag_name)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, cp, sink)
+ sink = blocks.tsb_vector_sink_c(tsb_key=tag_name)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, 2, tag_name), cp,
sink)
self.tb.run()
- self.assertEqual(sink.data(), expected_result)
+ self.assertEqual(sink.data()[0], expected_result)
tags = [gr.tag_to_python(x) for x in sink.tags()]
tags = sorted([(x.offset, x.key, x.value) for x in tags])
expected_tags = [
- (0, tag_name, len(expected_result)),
(fft_len+cp_len, "random_tag", 42)
]
self.assertEqual(tags, expected_tags)
diff --git a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py
b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py
index 1cdb8ed..c42fb2b 100755
--- a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py
+++ b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py
@@ -28,6 +28,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "tsb_key"
def tearDown (self):
self.tb = None
@@ -44,12 +45,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
fft_len = 8
equalizer = digital.ofdm_equalizer_static(fft_len)
n_syms = 3
- len_tag_key = "frame_len"
tx_data = (1,) * fft_len * n_syms
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(n_syms)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
@@ -58,20 +54,24 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
random_tag.offset = 1
random_tag.key = pmt.string_to_symbol("foo")
random_tag.value = pmt.from_long(42)
- src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag,
chan_tag, random_tag))
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
len_tag_key)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag,
random_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
n_syms, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
# Check data
- self.assertEqual(tx_data, sink.data())
+ self.assertEqual(tx_data, sink.data()[0])
# Check tags
tag_dict = dict()
for tag in sink.tags():
ptag = gr.tag_to_python(tag)
tag_dict[ptag.key] = ptag.value
expected_dict = {
- 'frame_len': n_syms,
'foo': 42
}
self.assertEqual(tag_dict, expected_dict)
@@ -83,23 +83,23 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
fft_len = 8
equalizer = digital.ofdm_equalizer_static(fft_len, symbols_skipped=1)
n_syms = 3
- len_tag_key = "frame_len"
tx_data = (1,) * fft_len * n_syms
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(n_syms)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len)
- src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag,
chan_tag))
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
len_tag_key)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag,))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
n_syms, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
# Check data
- self.assertEqual(tx_data, sink.data())
+ self.assertEqual(tx_data, sink.data()[0])
def test_001c_carrier_offset_no_cp (self):
"""
@@ -116,11 +116,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
# The rx'd signal is shifted
rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms
equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers)
- len_tag_key = "frame_len"
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(n_syms)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
@@ -130,13 +125,18 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
offset_tag.offset = 0
offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offset_tag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag,
chan_tag, offset_tag))
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len,
len_tag_key)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag,
offset_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len,
self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
n_syms, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
# Check data
- self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4)
+ self.assertComplexTuplesAlmostEqual(rx_expected, sink.data()[0],
places=4)
def test_001c_carrier_offset_cp (self):
"""
@@ -157,11 +157,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
# Rx'd signal is corrected
rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms
equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers)
- len_tag_key = "frame_len"
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(n_syms)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
@@ -170,13 +165,18 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
offset_tag.offset = 0
offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offset_tag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag,
chan_tag, offset_tag))
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len,
len_tag_key)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag,
offset_tag))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len,
self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
n_syms, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
# Check data
- self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4)
+ self.assertComplexTuplesAlmostEqual(rx_expected, sink.data()[0],
places=4)
def test_002_static (self):
"""
@@ -211,21 +211,21 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
]
for idx in range(fft_len, 2*fft_len):
channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi
* (numpy.random.rand()-.5))
- len_tag_key = "frame_len"
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(4)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
- src = blocks.vector_source_c(numpy.multiply(tx_signal, channel),
False, fft_len, (len_tag, chan_tag))
- sink = blocks.vector_sink_c(fft_len)
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
len_tag_key, True)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(numpy.multiply(tx_signal, channel),
False, fft_len, (chan_tag,))
+ sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
self.tsb_key, True)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
len(tx_data)/fft_len, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
- rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()]
+ rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()[0]]
# Check data
self.assertEqual(tx_data, rx_data)
# Check tags
@@ -238,7 +238,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
else:
tag_dict[ptag.key] = pmt.to_python(tag.value)
expected_dict = {
- 'frame_len': 4,
'ofdm_sync_chan_taps': channel[-fft_len:]
}
self.assertEqual(tag_dict, expected_dict)
@@ -247,7 +246,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
""" Same as before, but the input stream has no tag.
We specify the frame size in the constructor.
We also specify a tag key, so the output stream *should* have
- a length tag.
+ a TSB tag.
"""
fft_len = 8
n_syms = 4
@@ -274,22 +273,22 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi
* (numpy.random.rand()-.5))
idx2 = idx+2*fft_len
channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi *
(numpy.random.rand()-.5))
- src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False,
fft_len)
- # We do specify a length tag, it should then appear at the output
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
"frame_len", False, n_syms)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(numpy.multiply(tx_signal, channel),
False, fft_len)
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
self.tsb_key, False, n_syms)
+ sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
len(tx_data)/fft_len, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
- rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()]
+ rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()[0]]
self.assertEqual(tx_data, rx_data)
- # Check len tag
- tags = sink.tags()
- len_tag = dict()
- for tag in tags:
- ptag = gr.tag_to_python(tag)
- if ptag.key == 'frame_len':
- len_tag[ptag.key] = ptag.value
- self.assertEqual(len_tag, {'frame_len': 4})
+ # Check TSB Functionality
+ packets = sink.data()
+ self.assertEqual(len(packets), 1)
+ self.assertEqual(len(packets[0]), len(tx_data))
def test_002_static_wo_tags (self):
fft_len = 8
@@ -352,27 +351,26 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi
* (numpy.random.rand()-.5))
idx2 = idx+2*fft_len
channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi *
(numpy.random.rand()-.5))
- len_tag_key = "frame_len"
- len_tag = gr.tag_t()
- len_tag.offset = 0
- len_tag.key = pmt.string_to_symbol(len_tag_key)
- len_tag.value = pmt.from_long(4)
chan_tag = gr.tag_t()
chan_tag.offset = 0
chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps")
chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len])
- src = blocks.vector_source_c(numpy.multiply(tx_signal, channel),
False, fft_len, (len_tag, chan_tag))
- eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
len_tag_key, True)
- sink = blocks.vector_sink_c(fft_len)
- self.tb.connect(src, eq, sink)
+ src = blocks.vector_source_c(numpy.multiply(tx_signal, channel),
False, fft_len, (chan_tag,))
+ eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0,
self.tsb_key, True)
+ sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len,
len(tx_data)/fft_len, self.tsb_key),
+ eq,
+ sink
+ )
self.tb.run ()
- rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()]
+ rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in
sink.data()[0]]
self.assertEqual(tx_data, rx_data)
- for tag in sink.tags():
- if pmt.symbol_to_string(tag.key) == len_tag_key:
- self.assertEqual(pmt.to_long(tag.value), 4)
- if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
-
self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)),
channel[-fft_len:], places=1)
+ self.assertEqual(len(sink.tags()), 1)
+ tag = sink.tags()[0]
+ self.assertEqual(pmt.symbol_to_string(tag.key), "ofdm_sync_chan_taps")
+
self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)),
channel[-fft_len:], places=1)
if __name__ == '__main__':
diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
index 69997ce..8a60b97 100755
--- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
+++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -29,6 +29,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "ts_last"
def tearDown (self):
self.tb = None
@@ -42,21 +43,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16)) + (0, 0, 0)
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
tag_name, "", 0, "", False)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
self.tsb_key, "", 0, "", False)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms,
self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
- self.assertEqual(pmt.to_long(result_tag.value), n_syms *
len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
def test_001b_shifted (self):
""" Same as before, but shifted, because that's the normal mode in
OFDM Rx """
@@ -69,21 +61,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(18))
occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
tag_name)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len)
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
self.tsb_key)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms,
self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name)
- self.assertEqual(pmt.to_long(result_tag.value), n_syms *
len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
def test_002_with_offset (self):
""" Standard test, carrier offset """
@@ -96,32 +79,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16)) + (0, 0, 0)
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,
offsettag))
- sink = blocks.vector_sink_c()
+ src = blocks.vector_source_c(tx_symbols, False, fft_len, (offsettag,))
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
serializer = digital.ofdm_serializer_vcc(
fft_len,
occupied_carriers,
- tag_name,
+ self.tsb_key,
"", 0,
"ofdm_sync_carr_offset",
False
)
- self.tb.connect(src, serializer, sink)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms,
self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 2)
- for tag in sink.tags():
- if pmt.symbol_to_string(tag.key) == tag_name:
- self.assertEqual(pmt.to_long(tag.value), n_syms *
len(occupied_carriers[0]))
+ self.assertEqual(sink.data()[0], expected_result)
+ self.assertEqual(len(sink.tags()), 1)
def test_003_connect (self):
""" Connect carrier_allocator to ofdm_serializer,
@@ -133,19 +108,14 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
pilot_symbols = ((1j,),(-1j,))
#tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 *
n_syms)])
tx_data = (1, 2, 3, 4)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_data))
- src = blocks.vector_source_c(tx_data, False, 1, (tag,))
+ src = blocks.vector_source_c(tx_data, False, 1)
alloc = digital.ofdm_carrier_allocator_cvc(
fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols,
(), # No sync word
- tag_name,
+ self.tsb_key,
True # Output is shifted (default)
)
serializer = digital.ofdm_serializer_vcc(
@@ -155,10 +125,10 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
"", # Carrier offset key
True # Input is shifted (default)
)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, alloc, serializer, sink)
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data),
self.tsb_key), alloc, serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), tx_data)
+ self.assertEqual(sink.data()[0], tx_data)
def test_004_connect (self):
"""
@@ -176,33 +146,30 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
pilot_carriers = ((-3,),(3,))
pilot_symbols = ((1j,),(-1j,))
tx_data = (1, 2, 3, 4)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(tx_data))
offsettag = gr.tag_t()
offsettag.offset = 0
offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset")
offsettag.value = pmt.from_long(carr_offset)
- src = blocks.vector_source_c(tx_data, False, 1, (tag, offsettag))
+ src = blocks.vector_source_c(tx_data, False, 1, (offsettag,))
alloc = digital.ofdm_carrier_allocator_cvc(fft_len,
occupied_carriers,
pilot_carriers,
pilot_symbols, (),
- tag_name)
+ self.tsb_key)
tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True)
oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset,
1.0)
mixer = blocks.multiply_cc()
rx_fft = fft.fft_vcc(fft_len, True, (), True)
- sink2 = blocks.vector_sink_c(fft_len)
+ sink2 = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key)
self.tb.connect(rx_fft, sink2)
serializer = digital.ofdm_serializer_vcc(
alloc, "", 0, "ofdm_sync_carr_offset", True
)
- sink = blocks.vector_sink_c()
+ sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key)
self.tb.connect(
- src, alloc, tx_ifft,
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1,
len(tx_data), self.tsb_key),
+ alloc, tx_ifft,
blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len),
(mixer, 0),
blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len),
@@ -210,7 +177,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
)
self.tb.connect(oscillator, (mixer, 1))
self.tb.run ()
-
self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):],
tx_data, places=4)
+
self.assertComplexTuplesAlmostEqual(sink.data()[0][-len(occupied_carriers[0]):],
tx_data, places=4)
def test_005_packet_len_tag (self):
""" Standard test """
@@ -222,32 +189,23 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase):
expected_result = tuple(range(1, 16))
occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),)
n_syms = len(tx_symbols)/fft_len
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(n_syms)
+ packet_len_tsb_key = "packet_len"
tag2 = gr.tag_t()
tag2.offset = 0
tag2.key = pmt.string_to_symbol("packet_len")
tag2.value = pmt.from_long(len(expected_result))
- src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2))
- serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
tag_name, "packet_len", 0, "", False)
- sink = blocks.vector_sink_c()
- self.tb.connect(src, serializer, sink)
+ src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag2,))
+ serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers,
self.tsb_key, packet_len_tsb_key , 0, "", False)
+ sink = blocks.tsb_vector_sink_c(tsb_key=packet_len_tsb_key)
+ self.tb.connect(src,
blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms,
self.tsb_key), serializer, sink)
self.tb.run ()
- self.assertEqual(sink.data(), expected_result)
- self.assertEqual(len(sink.tags()), 1)
- result_tag = sink.tags()[0]
- self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len")
- self.assertEqual(pmt.to_long(result_tag.value), len(expected_result))
+ self.assertEqual(sink.data()[0], expected_result)
def test_099 (self):
""" Make sure it fails if it should """
fft_len = 16
- occupied_carriers = ((1, 3, 4, 11, 12, 112),)
- tag_name = "len"
- self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len,
occupied_carriers, tag_name)
+ occupied_carriers = ((1, 3, 4, 11, 12, 112),) # Something invalid
+ self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len,
occupied_carriers, self.tsb_key)
if __name__ == '__main__':
diff --git a/gr-digital/python/digital/qa_packet_headergenerator_bb.py
b/gr-digital/python/digital/qa_packet_headergenerator_bb.py
index bec0828..d2677ce 100755
--- a/gr-digital/python/digital/qa_packet_headergenerator_bb.py
+++ b/gr-digital/python/digital/qa_packet_headergenerator_bb.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
-# Copyright 2012 Free Software Foundation, Inc.
+#
+#Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -14,40 +15,35 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
+# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
from gnuradio import gr, gr_unittest, digital, blocks
+from gnuradio.gr import packet_utils
import pmt
class qa_packet_headergenerator_bb (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "tsb_key"
def tearDown (self):
self.tb = None
+ def setup_data_tags(self, data):
+ return packet_utils.packets_to_vectors(
+ data,
+ self.tsb_key
+ )
+
def test_001_12bits (self):
- # 3 PDUs: | | |
- data = (1, 2, 3, 4, 1, 2) + tuple(range(25))
- tagname = "packet_len"
- tag1 = gr.tag_t()
- tag1.offset = 0
- tag1.key = pmt.string_to_symbol(tagname)
- tag1.value = pmt.from_long(4)
- tag2 = gr.tag_t()
- tag2.offset = 4
- tag2.key = pmt.string_to_symbol(tagname)
- tag2.value = pmt.from_long(2)
- tag3 = gr.tag_t()
- tag3.offset = 6
- tag3.key = pmt.string_to_symbol(tagname)
- tag3.value = pmt.from_long(25)
- src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3))
- header = digital.packet_headergenerator_bb(12, tagname)
+ # 3 packets: | | |
+ data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2),
tuple(range(25))))
+ src = blocks.vector_source_b(data, tags=tags)
+ header = digital.packet_headergenerator_bb(12, self.tsb_key)
sink = blocks.vector_sink_b()
self.tb.connect(src, header, sink)
self.tb.run()
@@ -58,25 +54,11 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
)
self.assertEqual(sink.data(), expected_data)
-
def test_002_32bits (self):
- # 3 PDUs: | | | |
- data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
- tagname = "packet_len"
- tag1 = gr.tag_t()
- tag1.offset = 0
- tag1.key = pmt.string_to_symbol(tagname)
- tag1.value = pmt.from_long(4)
- tag2 = gr.tag_t()
- tag2.offset = 4
- tag2.key = pmt.string_to_symbol(tagname)
- tag2.value = pmt.from_long(2)
- tag3 = gr.tag_t()
- tag3.offset = 6
- tag3.key = pmt.string_to_symbol(tagname)
- tag3.value = pmt.from_long(4)
- src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3))
- header = digital.packet_headergenerator_bb(32, tagname)
+ # 3 packets: | | | |
+ data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4)))
+ src = blocks.vector_source_b(data, tags=tags)
+ header = digital.packet_headergenerator_bb(32, self.tsb_key)
sink = blocks.vector_sink_b()
self.tb.connect(src, header, sink)
self.tb.run()
@@ -88,26 +70,12 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
)
self.assertEqual(sink.data(), expected_data)
-
def test_003_12bits_formatter_object (self):
- # 3 PDUs: | | | |
- data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
- tagname = "packet_len"
- tag1 = gr.tag_t()
- tag1.offset = 0
- tag1.key = pmt.string_to_symbol(tagname)
- tag1.value = pmt.from_long(4)
- tag2 = gr.tag_t()
- tag2.offset = 4
- tag2.key = pmt.string_to_symbol(tagname)
- tag2.value = pmt.from_long(2)
- tag3 = gr.tag_t()
- tag3.offset = 6
- tag3.key = pmt.string_to_symbol(tagname)
- tag3.value = pmt.from_long(4)
- src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3))
- formatter_object = digital.packet_header_default(12, tagname)
- header =
digital.packet_headergenerator_bb(formatter_object.formatter(), tagname)
+ # 3 packets: | | | |
+ data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4)))
+ src = blocks.vector_source_b(data, tags=tags)
+ formatter_object = digital.packet_header_default(12, self.tsb_key)
+ header =
digital.packet_headergenerator_bb(formatter_object.formatter(), self.tsb_key)
sink = blocks.vector_sink_b()
self.tb.connect(src, header, sink)
self.tb.run()
@@ -120,26 +88,13 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase):
def test_004_8bits_formatter_ofdm (self):
occupied_carriers = ((1, 2, 3, 5, 6, 7),)
- # 3 PDUs: | | | |
- data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4)
- tagname = "packet_len"
- tag1 = gr.tag_t()
- tag1.offset = 0
- tag1.key = pmt.string_to_symbol(tagname)
- tag1.value = pmt.from_long(4)
- tag2 = gr.tag_t()
- tag2.offset = 4
- tag2.key = pmt.string_to_symbol(tagname)
- tag2.value = pmt.from_long(2)
- tag3 = gr.tag_t()
- tag3.offset = 6
- tag3.key = pmt.string_to_symbol(tagname)
- tag3.value = pmt.from_long(4)
- src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3))
- formatter_object = digital.packet_header_ofdm(occupied_carriers, 1,
tagname)
+ # 3 packets: | | | |
+ data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4)))
+ src = blocks.vector_source_b(data, tags=tags)
+ formatter_object = digital.packet_header_ofdm(occupied_carriers, 1,
self.tsb_key)
self.assertEqual(formatter_object.header_len(), 6)
- self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()),
tagname)
- header =
digital.packet_headergenerator_bb(formatter_object.formatter(), tagname)
+ self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()),
self.tsb_key)
+ header =
digital.packet_headergenerator_bb(formatter_object.formatter(), self.tsb_key)
sink = blocks.vector_sink_b()
self.tb.connect(src, header, sink)
self.tb.run()
- [Commit-gnuradio] [gnuradio] branch master updated (cba2d18 -> 5cb67ce), git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 10/12: Merge branch 'maint', git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 04/12: blocks: Added tsb_vector_sinks, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 07/12: runtime: Added packet_utils and marked tagged_streams.py for removal in future, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 05/12: blocks: updated some QAs to use proper tsb functions, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 06/12: digital: updated some QAs to use proper tsb functions,
git <=
- [Commit-gnuradio] [gnuradio] 11/12: Merge remote-tracking branch 'martin/tsb/prep_for_38', git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 08/12: uhd: Nicer labels for device args and addres in GRC, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 09/12: Merge branch 'uint64_sugar' of git://github.com/osh/gnuradio, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 02/12: pmt: support conversion of basic pmt pairs to python, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 01/12: pmt: adding uint64 sugar, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 03/12: pmt: making uint64_t sugar more friendly, git, 2014/05/23
- [Commit-gnuradio] [gnuradio] 12/12: Merge remote-tracking branch 'martin/uhd/clearer_grc_args', git, 2014/05/23