From 3abc8ae1612878ea35b3c0a50ced55d867528dfc Mon Sep 17 00:00:00 2001 From: goldsimon Date: Wed, 18 Apr 2018 21:51:34 +0200 Subject: [PATCH] LWIP_NETCONN_FULLDUPLEX: unblock rx threads on close Threads blocked on the rx mbox are counted and on close, one "netconn closed" message per thread is posted to the mbox to ensure all threads are woken. The netconn can then be safely deleted. In socket API, "fd_used" and "fd_free_pending" help with auto-deleting the netconn. Signed-off-by: goldsimon --- src/api/api_lib.c | 38 +++++++++++++++-- src/api/api_msg.c | 75 ++++++++++++++++++++++++--------- src/include/lwip/api.h | 5 +++ src/include/lwip/priv/api_msg.h | 3 ++ 4 files changed, 98 insertions(+), 23 deletions(-) diff --git a/src/api/api_lib.c b/src/api/api_lib.c index bee73f79..42edc57c 100644 --- a/src/api/api_lib.c +++ b/src/api/api_lib.c @@ -94,10 +94,14 @@ #if LWIP_NETCONN_FULLDUPLEX #define NETCONN_RECVMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->recvmbox) && (((conn)->flags & NETCONN_FLAG_MBOXINVALID) == 0)) #define NETCONN_ACCEPTMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->acceptmbox) && (((conn)->flags & (NETCONN_FLAG_MBOXCLOSED|NETCONN_FLAG_MBOXINVALID)) == 0)) -#else +#define NETCONN_MBOX_WAITING_INC(conn) SYS_ARCH_INC(conn->mbox_threads_waiting, 1) +#define NETCONN_MBOX_WAITING_DEC(conn) SYS_ARCH_INC(conn->mbox_threads_waiting, 1) +#else /* LWIP_NETCONN_FULLDUPLEX */ #define NETCONN_RECVMBOX_WAITABLE(conn) sys_mbox_valid(&(conn)->recvmbox) #define NETCONN_ACCEPTMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->acceptmbox) && (((conn)->flags & NETCONN_FLAG_MBOXCLOSED) == 0)) -#endif +#define NETCONN_MBOX_WAITING_INC(conn) +#define NETCONN_MBOX_WAITING_DEC(conn) +#endif /* LWIP_NETCONN_FULLDUPLEX */ static err_t netconn_close_shutdown(struct netconn *conn, u8_t how); @@ -494,21 +498,35 @@ netconn_accept(struct netconn *conn, struct netconn **new_conn) API_MSG_VAR_ALLOC_ACCEPT(msg); + NETCONN_MBOX_WAITING_INC(conn); if (netconn_is_nonblocking(conn)) { if (sys_arch_mbox_tryfetch(&conn->acceptmbox, &accept_ptr) == SYS_ARCH_TIMEOUT) { API_MSG_VAR_FREE_ACCEPT(msg); + NETCONN_MBOX_WAITING_DEC(conn); return ERR_WOULDBLOCK; } } else { #if LWIP_SO_RCVTIMEO if (sys_arch_mbox_fetch(&conn->acceptmbox, &accept_ptr, conn->recv_timeout) == SYS_ARCH_TIMEOUT) { API_MSG_VAR_FREE_ACCEPT(msg); + NETCONN_MBOX_WAITING_DEC(conn); return ERR_TIMEOUT; } #else sys_arch_mbox_fetch(&conn->acceptmbox, &accept_ptr, 0); #endif /* LWIP_SO_RCVTIMEO*/ } + NETCONN_MBOX_WAITING_DEC(conn); +#if LWIP_NETCONN_FULLDUPLEX + if (conn->flags & NETCONN_FLAG_MBOXINVALID) { + if (lwip_netconn_is_deallocated_msg(accept_ptr)) { + /* the netconn has been closed from another thread */ + API_MSG_VAR_FREE_ACCEPT(msg); + return ERR_CONN; + } + } +#endif + /* Register event with callback */ API_EVENT(conn, NETCONN_EVT_RCVMINUS, 0); @@ -576,10 +594,13 @@ netconn_recv_data(struct netconn *conn, void **new_buf, u8_t apiflags) return ERR_CONN; } + NETCONN_MBOX_WAITING_INC(conn); if (netconn_is_nonblocking(conn) || (apiflags & NETCONN_DONTBLOCK) || (conn->flags & NETCONN_FLAG_MBOXCLOSED) || (conn->pending_err != ERR_OK)) { if (sys_arch_mbox_tryfetch(&conn->recvmbox, &buf) == SYS_ARCH_TIMEOUT) { - err_t err = netconn_err(conn); + err_t err; + NETCONN_MBOX_WAITING_DEC(conn); + err = netconn_err(conn); if (err != ERR_OK) { /* return pending error */ return err; @@ -592,12 +613,23 @@ netconn_recv_data(struct netconn *conn, void **new_buf, u8_t apiflags) } else { #if LWIP_SO_RCVTIMEO if (sys_arch_mbox_fetch(&conn->recvmbox, &buf, conn->recv_timeout) == SYS_ARCH_TIMEOUT) { + NETCONN_MBOX_WAITING_DEC(conn); return ERR_TIMEOUT; } #else sys_arch_mbox_fetch(&conn->recvmbox, &buf, 0); #endif /* LWIP_SO_RCVTIMEO*/ } + NETCONN_MBOX_WAITING_DEC(conn); +#if LWIP_NETCONN_FULLDUPLEX + if (conn->flags & NETCONN_FLAG_MBOXINVALID) { + if (lwip_netconn_is_deallocated_msg(buf)) { + /* the netconn has been closed from another thread */ + API_MSG_VAR_FREE_ACCEPT(msg); + return ERR_CONN; + } + } +#endif #if LWIP_TCP #if (LWIP_UDP || LWIP_RAW) diff --git a/src/api/api_msg.c b/src/api/api_msg.c index 254b7559..614ca641 100644 --- a/src/api/api_msg.c +++ b/src/api/api_msg.c @@ -92,6 +92,19 @@ static void netconn_drain(struct netconn *conn); #define TCPIP_APIMSG_ACK(m) do { sys_sem_signal(LWIP_API_MSG_SEM(m)); } while(0) #endif /* LWIP_TCPIP_CORE_LOCKING */ +#if LWIP_NETCONN_FULLDUPLEX +const u8_t netconn_deleted = 0; + +int +lwip_netconn_is_deallocated_msg(void *msg) +{ + if (msg == &netconn_deleted) { + return 1; + } + return 0; +} +#endif /* LWIP_NETCONN_FULLDUPLEX */ + #if LWIP_TCP const u8_t netconn_aborted = 0; const u8_t netconn_reset = 0; @@ -823,16 +836,21 @@ netconn_drain(struct netconn *conn) /* Delete and drain the recvmbox. */ if (sys_mbox_valid(&conn->recvmbox)) { while (sys_mbox_tryfetch(&conn->recvmbox, &mem) != SYS_MBOX_EMPTY) { -#if LWIP_TCP - if (NETCONNTYPE_GROUP(conn->type) == NETCONN_TCP) { - err_t err; - if (!lwip_netconn_is_err_msg(mem, &err)) { - pbuf_free((struct pbuf *)mem); - } - } else -#endif /* LWIP_TCP */ +#if LWIP_NETCONN_FULLDUPLEX + if (!lwip_netconn_is_deallocated_msg(mem)) +#endif /* LWIP_NETCONN_FULLDUPLEX */ { - netbuf_delete((struct netbuf *)mem); +#if LWIP_TCP + if (NETCONNTYPE_GROUP(conn->type) == NETCONN_TCP) { + err_t err; + if (!lwip_netconn_is_err_msg(mem, &err)) { + pbuf_free((struct pbuf *)mem); + } + } else +#endif /* LWIP_TCP */ + { + netbuf_delete((struct netbuf *)mem); + } } } sys_mbox_free(&conn->recvmbox); @@ -843,18 +861,23 @@ netconn_drain(struct netconn *conn) #if LWIP_TCP if (sys_mbox_valid(&conn->acceptmbox)) { while (sys_mbox_tryfetch(&conn->acceptmbox, &mem) != SYS_MBOX_EMPTY) { - err_t err; - if (!lwip_netconn_is_err_msg(mem, &err)) { - struct netconn *newconn = (struct netconn *)mem; - /* Only tcp pcbs have an acceptmbox, so no need to check conn->type */ - /* pcb might be set to NULL already by err_tcp() */ - /* drain recvmbox */ - netconn_drain(newconn); - if (newconn->pcb.tcp != NULL) { - tcp_abort(newconn->pcb.tcp); - newconn->pcb.tcp = NULL; +#if LWIP_NETCONN_FULLDUPLEX + if (!lwip_netconn_is_deallocated_msg(mem)) +#endif /* LWIP_NETCONN_FULLDUPLEX */ + { + err_t err; + if (!lwip_netconn_is_err_msg(mem, &err)) { + struct netconn *newconn = (struct netconn *)mem; + /* Only tcp pcbs have an acceptmbox, so no need to check conn->type */ + /* pcb might be set to NULL already by err_tcp() */ + /* drain recvmbox */ + netconn_drain(newconn); + if (newconn->pcb.tcp != NULL) { + tcp_abort(newconn->pcb.tcp); + newconn->pcb.tcp = NULL; + } + netconn_free(newconn); } - netconn_free(newconn); } } sys_mbox_free(&conn->acceptmbox); @@ -867,8 +890,20 @@ netconn_drain(struct netconn *conn) static void netconn_mark_mbox_invalid(struct netconn *conn) { + int i, num_waiting; + void *msg = LWIP_CONST_CAST(void *, &netconn_deleted); + /* Prevent new calls/threads from reading from the mbox */ conn->flags |= NETCONN_FLAG_MBOXINVALID; + + SYS_ARCH_LOCKED(num_waiting = conn->mbox_threads_waiting); + for (i = 0; i < num_waiting; i++) { + if (sys_mbox_valid_val(conn->recvmbox)) { + sys_mbox_trypost(&conn->recvmbox, msg); + } else { + sys_mbox_trypost(&conn->acceptmbox, msg); + } + } } #endif /* LWIP_NETCONN_FULLDUPLEX */ diff --git a/src/include/lwip/api.h b/src/include/lwip/api.h index fb1e05a7..f8426a7d 100644 --- a/src/include/lwip/api.h +++ b/src/include/lwip/api.h @@ -241,6 +241,11 @@ struct netconn { by the application thread */ sys_mbox_t acceptmbox; #endif /* LWIP_TCP */ +#if LWIP_NETCONN_FULLDUPLEX + /** number of threads waiting on an mbox. This is required to unblock + all threads when closing while threads are waiting. */ + int mbox_threads_waiting; +#endif /** only used for socket layer */ #if LWIP_SOCKET int socket; diff --git a/src/include/lwip/priv/api_msg.h b/src/include/lwip/priv/api_msg.h index 9dfd088f..9e8ffc9e 100644 --- a/src/include/lwip/priv/api_msg.h +++ b/src/include/lwip/priv/api_msg.h @@ -187,6 +187,9 @@ struct dns_api_msg { }; #endif /* LWIP_DNS */ +#if LWIP_NETCONN_FULLDUPLEX +int lwip_netconn_is_deallocated_msg(void *msg); +#endif int lwip_netconn_is_err_msg(void *msg, err_t *err); void lwip_netconn_do_newconn (void *m); void lwip_netconn_do_delconn (void *m);