[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libmicrohttpd] 01/02: Fixed thread-safety for externally added connecti
From: |
gnunet |
Subject: |
[libmicrohttpd] 01/02: Fixed thread-safety for externally added connections Fully re-implemented scheme of adding connections from external thread (application) |
Date: |
Sun, 25 Oct 2020 15:26:24 +0100 |
This is an automated email from the git hooks/post-receive script.
karlson2k pushed a commit to branch master
in repository libmicrohttpd.
commit 19b038f68272a423ae74c89427c8b5dcfc7ce1ae
Author: Evgeny Grin (Karlson2k) <k2k@narod.ru>
AuthorDate: Thu Oct 22 16:38:59 2020 +0300
Fixed thread-safety for externally added connections
Fully re-implemented scheme of adding connections
from external thread (application)
---
src/microhttpd/daemon.c | 237 ++++++++++++++++++++++++++++++++++++----------
src/microhttpd/internal.h | 21 ++++
2 files changed, 210 insertions(+), 48 deletions(-)
diff --git a/src/microhttpd/daemon.c b/src/microhttpd/daemon.c
index 7ffd93cd..c0be6bc0 100644
--- a/src/microhttpd/daemon.c
+++ b/src/microhttpd/daemon.c
@@ -2606,35 +2606,56 @@ new_connection_prepare_ (struct MHD_Daemon *daemon,
}
+/**
+ * Close prepared, but not yet processed connection.
+ * @param daemon the daemon
+ * @param connection the connection to close
+ */
+static void
+new_connection_close_ (struct MHD_Daemon *daemon,
+ struct MHD_Connection *connection)
+{
+ mhd_assert (connection->daemon == daemon);
+ mhd_assert (! connection->in_cleanup);
+ mhd_assert (NULL == connection->next);
+ mhd_assert (NULL == connection->nextX);
+#ifdef EPOLL_SUPPORT
+ mhd_assert (NULL == connection->nextE);
+#endif /* EPOLL_SUPPORT */
+
+#ifdef HTTPS_SUPPORT
+ if (NULL != connection->tls_session)
+ {
+ mhd_assert (0 != (daemon->options & MHD_USE_TLS));
+ gnutls_deinit (connection->tls_session);
+ }
+#endif /* HTTPS_SUPPORT */
+ MHD_socket_close_chk_ (connection->socket_fd);
+ MHD_ip_limit_del (daemon,
+ connection->addr,
+ connection->addr_len);
+ free (connection->addr);
+ free (connection);
+}
+
+
/**
* Finally insert the new connection to the list of connections
- * served by the daemon.
+ * served by the daemon and start processing.
* @remark To be called only from thread that process
* daemon's select()/poll()/etc.
*
* @param daemon daemon that manages the connection
- * @param client_socket socket to manage (MHD will expect
- * to receive an HTTP request from this socket next).
- * @param addr IP address of the client
- * @param addrlen number of bytes in @a addr
- * @param external_add perform additional operations needed due
- * to the application calling us directly
* @param connection the newly created connection
- * @return #MHD_YES on success, #MHD_NO if this daemon could
- * not handle the connection (i.e. malloc failed, etc).
- * The socket will be closed in any case; 'errno' is
- * set to indicate further details about the error.
+ * @return #MHD_YES on success, #MHD_NO on error
*/
static enum MHD_Result
-new_connection_insert_ (struct MHD_Daemon *daemon,
- MHD_socket client_socket,
- const struct sockaddr *addr,
- socklen_t addrlen,
- bool external_add,
- struct MHD_Connection *connection)
+new_connection_process_ (struct MHD_Daemon *daemon,
+ struct MHD_Connection *connection)
{
int eno = 0;
+ mhd_assert (connection->daemon == daemon);
/* Allocate memory pool in the processing thread so
* intensively used memory area is allocated in "good"
* (for the thread) memory region. It is important with
@@ -2647,10 +2668,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
_ ("Error allocating memory: %s\n"),
MHD_strerror_ (errno));
#endif
- MHD_socket_close_chk_ (client_socket);
+ MHD_socket_close_chk_ (connection->socket_fd);
MHD_ip_limit_del (daemon,
- addr,
- addrlen);
+ connection->addr,
+ connection->addr_len);
free (connection);
#if ENOMEM
errno = ENOMEM;
@@ -2721,15 +2742,15 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
#ifdef EPOLL_SUPPORT
if (0 != (daemon->options & MHD_USE_EPOLL))
{
- if ((0 == (daemon->options & MHD_USE_TURBO)) || (external_add))
- { /* Do not manipulate EReady DL-list in 'external_add' mode. */
+ if (0 == (daemon->options & MHD_USE_TURBO))
+ {
struct epoll_event event;
event.events = EPOLLIN | EPOLLOUT | EPOLLPRI | EPOLLET;
event.data.ptr = connection;
if (0 != epoll_ctl (daemon->epoll_fd,
EPOLL_CTL_ADD,
- client_socket,
+ connection->socket_fd,
&event))
{
eno = errno;
@@ -2752,20 +2773,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
connection);
}
}
- else /* This 'else' is combined with next 'if'. */
-#endif
- if ( (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
- (external_add) &&
- (MHD_ITC_IS_VALID_ (daemon->itc)) &&
- (! MHD_itc_activate_ (daemon->itc, "n")) )
- {
-#ifdef HAVE_MESSAGES
- MHD_DLOG (daemon,
- _ (
- "Failed to signal new connection via inter-thread
communication channel.\n"));
#endif
- }
+
return MHD_YES;
+
cleanup:
if (NULL != daemon->notify_connection)
daemon->notify_connection (daemon->notify_connection_cls,
@@ -2776,10 +2787,10 @@ cleanup:
if (NULL != connection->tls_session)
gnutls_deinit (connection->tls_session);
#endif /* HTTPS_SUPPORT */
- MHD_socket_close_chk_ (client_socket);
+ MHD_socket_close_chk_ (connection->socket_fd);
MHD_ip_limit_del (daemon,
- addr,
- addrlen);
+ connection->addr,
+ connection->addr_len);
#if defined(MHD_USE_POSIX_THREADS) || defined(MHD_USE_W32_THREADS)
MHD_mutex_lock_chk_ (&daemon->cleanup_connection_mutex);
#endif
@@ -2880,8 +2891,82 @@ internal_add_connection (struct MHD_Daemon *daemon,
non_blck, &connection))
return MHD_NO;
- return new_connection_insert_ (daemon, client_socket, addr, addrlen,
- external_add, connection);
+ if ((external_add) &&
+ (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD)))
+ {
+ /* Connection is added externally and MHD is handling its own threads. */
+ MHD_mutex_lock_chk_ (&daemon->new_connections_mutex);
+ DLL_insert (daemon->new_connections_head,
+ daemon->new_connections_tail,
+ connection);
+ daemon->have_new = true;
+ MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex);
+
+ /* The rest of connection processing must be handled in
+ * the daemon thread. */
+ if ((MHD_ITC_IS_VALID_ (daemon->itc)) &&
+ (! MHD_itc_activate_ (daemon->itc, "n")))
+ {
+ #ifdef HAVE_MESSAGES
+ MHD_DLOG (daemon,
+ _ ("Failed to signal new connection via inter-thread " \
+ "communication channel.\n"));
+ #endif
+ }
+ return MHD_YES;
+ }
+
+ return new_connection_process_ (daemon, connection);
+}
+
+
+static void
+new_connections_list_process_ (struct MHD_Daemon *daemon)
+{
+ struct MHD_Connection *local_head;
+ struct MHD_Connection *local_tail;
+ struct MHD_Connection *c; /**< Currently processed connection */
+ mhd_assert (daemon->have_new);
+ mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD));
+
+ local_head = NULL;
+ local_tail = NULL;
+
+ /* Move all new connections to the local DL-list to release the mutex
+ * as quick as possible. */
+ MHD_mutex_lock_chk_ (&daemon->new_connections_mutex);
+ mhd_assert (NULL != daemon->new_connections_head);
+ do
+ { /* Move connection in FIFO order. */
+ c = daemon->new_connections_tail;
+ DLL_remove (daemon->new_connections_head,
+ daemon->new_connections_tail,
+ c);
+ DLL_insert (local_head,
+ local_tail,
+ c);
+ } while (NULL != daemon->new_connections_tail);
+ daemon->have_new = false;
+ MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex);
+
+ /* Process new connections in FIFO order. */
+ do
+ {
+ c = local_tail;
+ DLL_remove (local_head,
+ local_tail,
+ c);
+ mhd_assert (daemon == c->daemon);
+ if (MHD_NO == new_connection_process_ (daemon, c))
+ {
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (daemon,
+ _ ("Failed to start serving new connection.\n"));
+#endif
+ (void) 0;
+ }
+ } while (NULL != local_tail);
+
}
@@ -3710,6 +3795,10 @@ internal_run_from_select (struct MHD_Daemon *daemon,
read_fd_set)) )
MHD_itc_clear_ (daemon->itc);
+ /* Process externally added connection if any */
+ if (daemon->have_new)
+ new_connections_list_process_ (daemon);
+
/* select connection thread handling type */
if ( (MHD_INVALID_SOCKET != (ds = daemon->listen_fd)) &&
(! daemon->was_quiesced) &&
@@ -4141,9 +4230,6 @@ MHD_poll_all (struct MHD_Daemon *daemon,
return MHD_NO;
}
- /* Reset. New value will be set when connections are processed. */
- daemon->data_already_pending = false;
-
/* handle ITC FD */
/* do it before any other processing so
new signals will be processed in next loop */
@@ -4157,6 +4243,19 @@ MHD_poll_all (struct MHD_Daemon *daemon,
free (p);
return MHD_NO;
}
+
+ /* Process externally added connection if any */
+ if (daemon->have_new)
+ new_connections_list_process_ (daemon);
+
+ /* handle 'listen' FD */
+ if ( (-1 != poll_listen) &&
+ (0 != (p[poll_listen].revents & POLLIN)) )
+ (void) MHD_accept_connection (daemon);
+
+ /* Reset. New value will be set when connections are processed. */
+ daemon->data_already_pending = false;
+
i = 0;
prev = daemon->connections_tail;
while (NULL != (pos = prev))
@@ -4209,10 +4308,6 @@ MHD_poll_all (struct MHD_Daemon *daemon,
}
}
#endif /* HTTPS_SUPPORT && UPGRADE_SUPPORT */
- /* handle 'listen' FD */
- if ( (-1 != poll_listen) &&
- (0 != (p[poll_listen].revents & POLLIN)) )
- (void) MHD_accept_connection (daemon);
free (p);
}
@@ -4294,6 +4389,11 @@ MHD_poll_listen_socket (struct MHD_Daemon *daemon,
/* handle shutdown */
if (daemon->shutdown)
return MHD_NO;
+
+ /* Process externally added connection if any */
+ if (daemon->have_new)
+ new_connections_list_process_ (daemon);
+
if ( (-1 != poll_listen) &&
(0 != (p[poll_listen].revents & POLLIN)) )
(void) MHD_accept_connection (daemon);
@@ -4739,6 +4839,10 @@ MHD_epoll (struct MHD_Daemon *daemon,
}
}
+ /* Process externally added connection if any */
+ if (daemon->have_new)
+ new_connections_list_process_ (daemon);
+
if (need_to_accept)
{
unsigned int series_length = 0;
@@ -6633,6 +6737,18 @@ MHD_start_daemon_va (unsigned int flags,
#endif /* ! HAVE_LISTEN_SHUTDOWN */
if (0 == daemon->worker_pool_size)
{
+ if (! MHD_mutex_init_ (&daemon->new_connections_mutex))
+ {
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (daemon,
+ _ ("Failed to initialise mutex.\n"));
+#endif
+ MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex);
+ MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex);
+ if (MHD_INVALID_SOCKET != listen_fd)
+ MHD_socket_close_chk_ (listen_fd);
+ goto free_and_fail;
+ }
if (! MHD_create_named_thread_ (&daemon->pid,
(*pflags
& MHD_USE_THREAD_PER_CONNECTION) ?
@@ -6646,6 +6762,7 @@ MHD_start_daemon_va (unsigned int flags,
_ ("Failed to create listen thread: %s\n"),
MHD_strerror_ (errno));
#endif
+ MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex);
MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex);
MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex);
if (MHD_INVALID_SOCKET != listen_fd)
@@ -6685,7 +6802,14 @@ MHD_start_daemon_va (unsigned int flags,
d->master = daemon;
d->worker_pool_size = 0;
d->worker_pool = NULL;
-
+ if (! MHD_mutex_init_ (&d->new_connections_mutex))
+ {
+ #ifdef HAVE_MESSAGES
+ MHD_DLOG (daemon,
+ _ ("Failed to initialise mutex.\n"));
+ #endif
+ goto thread_failed;
+ }
if (0 != (*pflags & MHD_USE_ITC))
{
if (! MHD_itc_init_ (d->itc))
@@ -6696,6 +6820,7 @@ MHD_start_daemon_va (unsigned int flags,
"Failed to create worker inter-thread communication
channel: %s\n"),
MHD_itc_last_strerror_ () );
#endif
+ MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
goto thread_failed;
}
if ( (0 == (*pflags & (MHD_USE_POLL | MHD_USE_EPOLL))) &&
@@ -6707,6 +6832,7 @@ MHD_start_daemon_va (unsigned int flags,
_ (
"File descriptor for worker inter-thread communication
channel exceeds maximum value.\n"));
#endif
+ MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
MHD_itc_destroy_chk_ (d->itc);
goto thread_failed;
}
@@ -6733,6 +6859,7 @@ MHD_start_daemon_va (unsigned int flags,
{
if (MHD_ITC_IS_VALID_ (d->itc))
MHD_itc_destroy_chk_ (d->itc);
+ MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
goto thread_failed;
}
#endif
@@ -6745,6 +6872,7 @@ MHD_start_daemon_va (unsigned int flags,
#endif
if (MHD_ITC_IS_VALID_ (d->itc))
MHD_itc_destroy_chk_ (d->itc);
+ MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
goto thread_failed;
}
@@ -6765,6 +6893,7 @@ MHD_start_daemon_va (unsigned int flags,
MHD_mutex_destroy_chk_ (&d->cleanup_connection_mutex);
if (MHD_ITC_IS_VALID_ (d->itc))
MHD_itc_destroy_chk_ (d->itc);
+ MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
goto thread_failed;
}
}
@@ -6879,6 +7008,17 @@ close_all_connections (struct MHD_Daemon *daemon)
mhd_assert (NULL == daemon->worker_pool);
#endif
mhd_assert (daemon->shutdown);
+
+ /* Remove externally added new connections that are
+ * not processed by the daemon thread. */
+ while (NULL != (pos = daemon->new_connections_tail))
+ {
+ mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD));
+ DLL_remove (daemon->new_connections_head,
+ daemon->new_connections_tail,
+ pos);
+ new_connection_close_ (daemon, pos);
+ }
/* give upgraded HTTPS connections a chance to finish */
/* 'daemon->urh_head' is not used in thread-per-connection mode. */
for (urh = daemon->urh_tail; NULL != urh; urh = urhn)
@@ -7126,6 +7266,7 @@ MHD_stop_daemon (struct MHD_Daemon *daemon)
}
if (MHD_ITC_IS_VALID_ (daemon->itc))
MHD_itc_destroy_chk_ (daemon->itc);
+ MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex);
#ifdef EPOLL_SUPPORT
if ( (0 != (daemon->options & MHD_USE_EPOLL)) &&
diff --git a/src/microhttpd/internal.h b/src/microhttpd/internal.h
index 5fb3da3d..3b044984 100644
--- a/src/microhttpd/internal.h
+++ b/src/microhttpd/internal.h
@@ -1301,6 +1301,16 @@ struct MHD_Daemon
*/
void *default_handler_cls;
+ /**
+ * Head of doubly-linked list of new, externally added connections.
+ */
+ struct MHD_Connection *new_connections_head;
+
+ /**
+ * Tail of doubly-linked list of new, externally added connections.
+ */
+ struct MHD_Connection *new_connections_tail;
+
/**
* Head of doubly-linked list of our current, active connections.
*/
@@ -1516,6 +1526,11 @@ struct MHD_Daemon
* "manual_timeout" DLLs.
*/
MHD_mutex_ cleanup_connection_mutex;
+
+ /**
+ * Mutex for any access to the "new connections" DL-list.
+ */
+ MHD_mutex_ new_connections_mutex;
#endif
/**
@@ -1600,6 +1615,12 @@ struct MHD_Daemon
*/
volatile bool resuming;
+ /**
+ * Indicate that new connections in @e new_connections_head list
+ * need to be processed.
+ */
+ volatile bool have_new;
+
/**
* 'True' if some data is already waiting to be processed.
* If set to 'true' - zero timeout for select()/poll*()
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.