[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 07/17: blocks: added 'MTU' and 'tcp_no_dela
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 07/17: blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces) Whitespace clean-up. |
Date: |
Mon, 31 Mar 2014 20:15:53 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 8b8d7ee57ce4029411cde48810e2ec6eac7ae5f2
Author: Balint Seeber <address@hidden>
Date: Thu Mar 27 01:27:33 2014 -0700
blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC
option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server
endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces)
Whitespace clean-up.
---
gr-blocks/grc/blocks_socket_pdu.xml | 28 ++++-
gr-blocks/include/gnuradio/blocks/socket_pdu.h | 3 +-
gr-blocks/lib/socket_pdu_impl.cc | 136 +++++++++++++++----------
gr-blocks/lib/socket_pdu_impl.h | 5 +-
4 files changed, 113 insertions(+), 59 deletions(-)
diff --git a/gr-blocks/grc/blocks_socket_pdu.xml
b/gr-blocks/grc/blocks_socket_pdu.xml
index 1e897cf..72dc381 100644
--- a/gr-blocks/grc/blocks_socket_pdu.xml
+++ b/gr-blocks/grc/blocks_socket_pdu.xml
@@ -8,7 +8,7 @@
<name>Socket PDU</name>
<key>blocks_socket_pdu</key>
<import>from gnuradio import blocks</import>
- <make>blocks.socket_pdu($type, $host, $port, $mtu)</make>
+ <make>blocks.socket_pdu($type, $host, $port, $mtu, $tcp_no_delay)</make>
<param>
<name>Type</name>
<key>type</key>
@@ -49,6 +49,31 @@
<value>10000</value>
<type>int</type>
</param>
+ <param>
+ <name>TCP No Delay</name>
+ <key>tcp_no_delay</key>
+ <value>False</value>
+ <type>enum</type>
+ <hide>
+#if (($type() == '"TCP_CLIENT"') or ($type() == '"TCP_SERVER"'))
+#if (str($tcp_no_delay()) == 'False')
+part
+#else
+none
+#end if
+#else
+all
+#end if
+</hide>
+ <option>
+ <name>Enabled</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>Disabled</name>
+ <key>False</key>
+ </option>
+ </param>
<sink>
<name>pdus</name>
<type>message</type>
@@ -59,4 +84,5 @@
<type>message</type>
<optional>1</optional>
</source>
+ <doc>For server modes, leave Host blank to bind to all interfaces
(equivalent to 0.0.0.0).</doc>
</block>
diff --git a/gr-blocks/include/gnuradio/blocks/socket_pdu.h
b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
index 82a7632..31468a3 100644
--- a/gr-blocks/include/gnuradio/blocks/socket_pdu.h
+++ b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
@@ -45,8 +45,9 @@ namespace gr {
* \param addr network address to use
* \param port network port to use
* \param MTU maximum transmission unit
+ * \param tcp_no_delay TCP No Delay option (set to True to disable Nagle
algorithm)
*/
- static sptr make(std::string type, std::string addr, std::string port,
int MTU=10000);
+ static sptr make(std::string type, std::string addr, std::string port,
int MTU=10000, bool tcp_no_delay=false);
};
} /* namespace blocks */
diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc
index 9daf8c3..3e483fb 100644
--- a/gr-blocks/lib/socket_pdu_impl.cc
+++ b/gr-blocks/lib/socket_pdu_impl.cc
@@ -33,41 +33,56 @@ namespace gr {
namespace blocks {
socket_pdu::sptr
- socket_pdu::make(std::string type, std::string addr, std::string port, int
MTU)
+ socket_pdu::make(std::string type, std::string addr, std::string port, int
MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
{
- return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port,
MTU));
+ return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port,
MTU, tcp_no_delay));
}
- socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr,
std::string port, int MTU)
- : block("socket_pdu",
- io_signature::make (0, 0, 0),
- io_signature::make (0, 0, 0))
+ socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr,
std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
+ : block("socket_pdu",
+ io_signature::make (0, 0, 0),
+ io_signature::make (0, 0, 0)),
+ d_tcp_no_delay(tcp_no_delay)
{
+ d_rxbuf.resize(MTU);
+
message_port_register_in(PDU_PORT_ID);
message_port_register_out(PDU_PORT_ID);
- if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
+ if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) {
// Bind on all interfaces
+ int port_num = atoi(port.c_str());
+ if (port_num == 0)
+ throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for
TCP_SERVER");
+ d_tcp_endpoint =
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num);
+ }
+ else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
boost::asio::ip::tcp::resolver resolver(d_io_service);
boost::asio::ip::tcp::resolver::query
query(boost::asio::ip::tcp::v4(), addr, port);
- d_tcp_endpoint = *resolver.resolve(query);
+ d_tcp_endpoint = *resolver.resolve(query);
}
-
- if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
- boost::asio::ip::udp::resolver resolver(d_io_service);
+ else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr ==
"0.0.0.0"))) { // Bind on all interfaces
+ int port_num = atoi(port.c_str());
+ if (port_num == 0)
+ throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for
UDP_SERVER");
+ d_udp_endpoint =
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num);
+ }
+ else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
+ boost::asio::ip::udp::resolver resolver(d_io_service);
boost::asio::ip::udp::resolver::query
query(boost::asio::ip::udp::v4(), addr, port);
if (type == "UDP_SERVER")
- d_udp_endpoint = *resolver.resolve(query);
+ d_udp_endpoint = *resolver.resolve(query);
else
- d_udp_endpoint_other = *resolver.resolve(query);
+ d_udp_endpoint_other = *resolver.resolve(query);
}
if (type == "TCP_SERVER") {
d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service,
d_tcp_endpoint));
d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+
start_tcp_accept();
+
set_msg_handler(PDU_PORT_ID,
boost::bind(&socket_pdu_impl::tcp_server_send, this, _1));
-
}
else if (type =="TCP_CLIENT") {
boost::system::error_code error = boost::asio::error::host_not_found;
@@ -75,34 +90,35 @@ namespace gr {
d_tcp_socket->connect(d_tcp_endpoint, error);
if (error)
throw boost::system::system_error(error);
+
d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay));
set_msg_handler(PDU_PORT_ID,
boost::bind(&socket_pdu_impl::tcp_client_send, this, _1));
- d_tcp_socket->async_read_some(
- boost::asio::buffer(d_rxbuf),
- boost::bind(&socket_pdu_impl::handle_tcp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
+ d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+ boost::bind(&socket_pdu_impl::handle_tcp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
else if (type =="UDP_SERVER") {
d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service,
d_udp_endpoint));
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf),
d_udp_endpoint_other,
-
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-
boost::asio::placeholders::error,
-
boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send,
this, _1));
}
else if (type =="UDP_CLIENT") {
d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service,
d_udp_endpoint));
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf),
d_udp_endpoint_other,
-
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-
boost::asio::placeholders::error,
-
boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send,
this, _1));
}
else
- throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
+ throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
d_thread =
gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this));
d_started = true;
@@ -112,14 +128,14 @@ namespace gr {
socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error,
size_t bytes_transferred)
{
if (!error) {
- pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const
uint8_t *)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
- message_port_pub(PDU_PORT_ID, pdu);
-
- d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
-
boost::bind(&socket_pdu_impl::handle_tcp_read, this,
-
boost::asio::placeholders::error,
-
boost::asio::placeholders::bytes_transferred));
+ pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const
uint8_t *)&d_rxbuf[0]);
+ pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
+ message_port_pub(PDU_PORT_ID, pdu);
+
+ d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+ boost::bind(&socket_pdu_impl::handle_tcp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
else
throw boost::system::system_error(error);
@@ -128,11 +144,11 @@ namespace gr {
void
socket_pdu_impl::start_tcp_accept()
{
- tcp_connection::sptr new_connection =
tcp_connection::make(d_acceptor_tcp->get_io_service());
+ tcp_connection::sptr new_connection =
tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(),
d_tcp_no_delay);
d_acceptor_tcp->async_accept(new_connection->socket(),
-
boost::bind(&socket_pdu_impl::handle_tcp_accept, this,
- new_connection,
boost::asio::placeholders::error));
+ boost::bind(&socket_pdu_impl::handle_tcp_accept, this,
+ new_connection, boost::asio::placeholders::error));
}
void
@@ -147,12 +163,12 @@ namespace gr {
socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection,
const boost::system::error_code& error)
{
if (!error) {
- new_connection->start(this);
- d_tcp_connections.push_back(new_connection);
- start_tcp_accept();
+ new_connection->start(this);
+ d_tcp_connections.push_back(new_connection);
+ start_tcp_accept();
}
else
- std::cout << error << std::endl;
+ std::cout << error << std::endl;
}
void
@@ -160,22 +176,32 @@ namespace gr {
{
pmt::pmt_t vector = pmt::cdr(msg);
size_t len = pmt::length(vector);
- size_t offset(0);
- boost::array<char, 10000> txbuf;
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
- d_tcp_socket->send(boost::asio::buffer(txbuf,len));
+ size_t offset = 0;
+ std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+ while (offset < len) {
+ size_t send_len = std::min((len - offset), txbuf.size());
+ memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset),
send_len);
+ offset += send_len;
+ d_tcp_socket->send(boost::asio::buffer(txbuf, send_len));
+ }
}
void
socket_pdu_impl::udp_send(pmt::pmt_t msg)
{
+ if (d_udp_endpoint_other.address().to_string() == "0.0.0.0")
+ return;
+
pmt::pmt_t vector = pmt::cdr(msg);
size_t len = pmt::length(vector);
- size_t offset(0);
- boost::array<char, 10000> txbuf;
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
- if (d_udp_endpoint_other.address().to_string() != "0.0.0.0")
- d_udp_socket->send_to(boost::asio::buffer(txbuf,len),
d_udp_endpoint_other);
+ size_t offset = 0;
+ std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+ while (offset < len) {
+ size_t send_len = std::min((len - offset), txbuf.size());
+ memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset),
send_len);
+ offset += send_len;
+ d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len),
d_udp_endpoint_other);
+ }
}
void
@@ -183,14 +209,14 @@ namespace gr {
{
if (!error) {
pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const
uint8_t*)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons( pmt::PMT_NIL, vector);
+ pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
message_port_pub(PDU_PORT_ID, pdu);
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf),
d_udp_endpoint_other,
-
boost::bind(&socket_pdu_impl::handle_udp_read, this,
-
boost::asio::placeholders::error,
-
boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
}
diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h
index 3099d90..2d5bc33 100644
--- a/gr-blocks/lib/socket_pdu_impl.h
+++ b/gr-blocks/lib/socket_pdu_impl.h
@@ -34,13 +34,14 @@ namespace gr {
{
private:
boost::asio::io_service d_io_service;
- boost::array<char, 10000> d_rxbuf;
+ std::vector<char> d_rxbuf;
void run_io_service() { d_io_service.run(); }
// TCP specific
boost::asio::ip::tcp::endpoint d_tcp_endpoint;
std::vector<tcp_connection::sptr> d_tcp_connections;
void handle_tcp_read(const boost::system::error_code& error, size_t
bytes_transferred);
+ bool d_tcp_no_delay;
// TCP server specific
boost::shared_ptr<boost::asio::ip::tcp::acceptor> d_acceptor_tcp;
@@ -60,7 +61,7 @@ namespace gr {
void udp_send(pmt::pmt_t msg);
public:
- socket_pdu_impl(std::string type, std::string addr, std::string port,
int MTU);
+ socket_pdu_impl(std::string type, std::string addr, std::string port,
int MTU = 10000, bool tcp_no_delay = false);
};
} /* namespace blocks */
- [Commit-gnuradio] [gnuradio] branch master updated (a8f73d8 -> 7390c25), git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 01/17: runtime: restoring sanity to pmt.is_blob(), git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 09/17: Add matching HDLC framer. Fix deframer max/min., git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 04/17: blocks: hide 'ignore tag' param for 'throttle' block in GRC if it's the default value (i.e. old look before param was added), git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 11/17: Merge remote-tracking branch 'osh/pmt_fix', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 02/17: Add HDLC deframer to gr-digital. Input unpacked bits, output PDU binary blobs., git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 07/17: blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces) Whitespace clean-up.,
git <=
- [Commit-gnuradio] [gnuradio] 06/17: blocks: added 'MTU' (buffer size) & 'no_delay' params to 'tcp_connection' (no longer uses fixed buffer), git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 16/17: Merge remote-tracking branch 'balint/3.7-1/socket_pdu_improvements', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 05/17: digital: added 'byte' IO format, git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 08/17: cmake: updated max ver for FindQwt to 6.2.0 (Qwt compiled direct from their source tree worked with trondeau's qt_number_sink test FG), git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 10/17: Add QA code to HDLC framer/deframer., git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 17/17: Merge remote-tracking branch 'balint/3.7-1/findqwt_max_ver_6.2.0', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 12/17: Merge remote-tracking branch 'bistromath/hdlc', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 15/17: Merge remote-tracking branch 'balint/3.7-1/throttle_grc_hide_ignoretag', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 13/17: Merge remote-tracking branch 'balint/3.7-1/tcp_connection_mtu_no_delay', git, 2014/03/31
- [Commit-gnuradio] [gnuradio] 14/17: Merge remote-tracking branch 'balint/3.7-1/header_payload_demux_byte_type', git, 2014/03/31