task #7865 (implement non-blocking send operation)

This commit is contained in:
goldsimon 2010-02-13 17:08:40 +00:00
parent 49c6ce3703
commit 0792effc2a
5 changed files with 92 additions and 40 deletions

View File

@ -19,6 +19,10 @@ HISTORY
++ New features: ++ New features:
2010-02-13: Simon Goldschmidt
* api.h, api_lib.c, api_msg.c, sockets.c: task #7865 (implement non-
blocking send operation)
2010-02-12: Simon Goldschmidt 2010-02-12: Simon Goldschmidt
* sockets.c/.h: Added a minimal version of posix fctl() to have a * sockets.c/.h: Added a minimal version of posix fctl() to have a
standardised way to set O_NONBLOCK for nonblocking sockets. standardised way to set O_NONBLOCK for nonblocking sockets.

View File

@ -527,6 +527,8 @@ netconn_write(struct netconn *conn, const void *dataptr, size_t size, u8_t apifl
return ERR_OK; return ERR_OK;
} }
/* @todo: for non-blocking write, check if 'size' would ever fit into
snd_queue or snd_buf */
msg.function = do_write; msg.function = do_write;
msg.msg.conn = conn; msg.msg.conn = conn;
msg.msg.msg.w.dataptr = dataptr; msg.msg.msg.w.dataptr = dataptr;

View File

@ -279,6 +279,17 @@ poll_tcp(void *arg, struct tcp_pcb *pcb)
} }
/* @todo: implement connect timeout here? */ /* @todo: implement connect timeout here? */
/* Did a nonblocking write fail before? Then check available write-space. */
if (conn->flags & NETCONN_FLAG_CHECK_WRITESPACE) {
/* If the queued byte- or pbuf-count drops below the configured low-water limit,
let select mark this pcb as writable again. */
if ((conn->pcb.tcp != NULL) && (tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) &&
(tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT)) {
conn->flags &= ~NETCONN_FLAG_CHECK_WRITESPACE;
API_EVENT(conn, NETCONN_EVT_SENDPLUS, 0);
}
}
return ERR_OK; return ERR_OK;
} }
@ -308,6 +319,7 @@ sent_tcp(void *arg, struct tcp_pcb *pcb, u16_t len)
let select mark this pcb as writable again. */ let select mark this pcb as writable again. */
if ((conn->pcb.tcp != NULL) && (tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) && if ((conn->pcb.tcp != NULL) && (tcp_sndbuf(conn->pcb.tcp) > TCP_SNDLOWAT) &&
(tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT)) { (tcp_sndqueuelen(conn->pcb.tcp) < TCP_SNDQUEUELOWAT)) {
conn->flags &= ~NETCONN_FLAG_CHECK_WRITESPACE;
API_EVENT(conn, NETCONN_EVT_SENDPLUS, len); API_EVENT(conn, NETCONN_EVT_SENDPLUS, len);
} }
} }
@ -1128,11 +1140,13 @@ do_recv(struct api_msg_msg *msg)
static err_t static err_t
do_writemore(struct netconn *conn) do_writemore(struct netconn *conn)
{ {
err_t err; err_t err = ERR_OK;
void *dataptr; void *dataptr;
u16_t len, available; u16_t len, available;
u8_t write_finished = 0; u8_t write_finished = 0;
size_t diff; size_t diff;
u8_t dontblock = netconn_is_nonblocking(conn) ||
(conn->current_msg->msg.w.apiflags & NETCONN_DONTBLOCK);
LWIP_ASSERT("conn != NULL", conn != NULL); LWIP_ASSERT("conn != NULL", conn != NULL);
LWIP_ASSERT("conn->state == NETCONN_WRITE", (conn->state == NETCONN_WRITE)); LWIP_ASSERT("conn->state == NETCONN_WRITE", (conn->state == NETCONN_WRITE));
@ -1159,10 +1173,24 @@ do_writemore(struct netconn *conn)
conn->flags |= NETCONN_FLAG_WRITE_DELAYED; conn->flags |= NETCONN_FLAG_WRITE_DELAYED;
#endif #endif
} }
if (dontblock && (len < conn->current_msg->msg.w.len)) {
err = tcp_write(conn->pcb.tcp, dataptr, len, conn->current_msg->msg.w.apiflags); /* failed to send all data at once -> nonblocking write not possible */
err = ERR_MEM;
}
if (err == ERR_OK) {
LWIP_ASSERT("do_writemore: invalid length!", ((conn->write_offset + len) <= conn->current_msg->msg.w.len)); LWIP_ASSERT("do_writemore: invalid length!", ((conn->write_offset + len) <= conn->current_msg->msg.w.len));
err = tcp_write(conn->pcb.tcp, dataptr, len, conn->current_msg->msg.w.apiflags);
}
if (dontblock && (err == ERR_MEM)) {
/* nonblocking write failed */
write_finished = 1;
err = ERR_WOULBLOCK;
/* let poll_tcp check writable space to mark the pcb
writable again */
conn->flags |= NETCONN_FLAG_CHECK_WRITESPACE;
/* let select mark this pcb as non-writable. */
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
} else {
/* if OK or memory error, check available space */ /* if OK or memory error, check available space */
if (((err == ERR_OK) || (err == ERR_MEM)) && if (((err == ERR_OK) || (err == ERR_MEM)) &&
((tcp_sndbuf(conn->pcb.tcp) <= TCP_SNDLOWAT) || ((tcp_sndbuf(conn->pcb.tcp) <= TCP_SNDLOWAT) ||
@ -1171,6 +1199,7 @@ do_writemore(struct netconn *conn)
let select mark this pcb as non-writable. */ let select mark this pcb as non-writable. */
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len); API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
} }
if (err == ERR_OK) { if (err == ERR_OK) {
conn->write_offset += len; conn->write_offset += len;
if (conn->write_offset == conn->current_msg->msg.w.len) { if (conn->write_offset == conn->current_msg->msg.w.len) {
@ -1187,14 +1216,15 @@ do_writemore(struct netconn *conn)
/* tcp_enqueue returned ERR_MEM, try tcp_output anyway */ /* tcp_enqueue returned ERR_MEM, try tcp_output anyway */
tcp_output(conn->pcb.tcp); tcp_output(conn->pcb.tcp);
#if LWIP_TCPIP_CORE_LOCKING #if LWIP_TCPIP_CORE_LOCKING
conn->flags |= NETCONN_FLAG_WRITE_DELAYED; conn->flags |= NETCONN_FLAG_WRITE_DELAYED;
#endif #endif
} else { } else {
/* On errors != ERR_MEM, we don't try writing any more but return /* On errors != ERR_MEM, we don't try writing any more but return
the error to the application thread. */ the error to the application thread. */
write_finished = 1; write_finished = 1;
} }
}
if (write_finished) { if (write_finished) {
/* everything was written: set back connection state /* everything was written: set back connection state
@ -1232,6 +1262,7 @@ do_write(struct api_msg_msg *msg)
if (msg->conn->type == NETCONN_TCP) { if (msg->conn->type == NETCONN_TCP) {
#if LWIP_TCP #if LWIP_TCP
if (msg->conn->state != NETCONN_NONE) { if (msg->conn->state != NETCONN_NONE) {
/* netconn is connecting, closing or in blocking write */
msg->err = ERR_INPROGRESS; msg->err = ERR_INPROGRESS;
} else if (msg->conn->pcb.tcp != NULL) { } else if (msg->conn->pcb.tcp != NULL) {
msg->conn->state = NETCONN_WRITE; msg->conn->state = NETCONN_WRITE;

View File

@ -683,6 +683,7 @@ lwip_send(int s, const void *data, size_t size, int flags)
{ {
struct lwip_socket *sock; struct lwip_socket *sock;
err_t err; err_t err;
u8_t write_flags;
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_send(%d, data=%p, size=%"SZT_F", flags=0x%x)\n", LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_send(%d, data=%p, size=%"SZT_F", flags=0x%x)\n",
s, data, size, flags)); s, data, size, flags));
@ -700,7 +701,18 @@ lwip_send(int s, const void *data, size_t size, int flags)
#endif /* (LWIP_UDP || LWIP_RAW) */ #endif /* (LWIP_UDP || LWIP_RAW) */
} }
err = netconn_write(sock->conn, data, size, NETCONN_COPY | ((flags & MSG_MORE)?NETCONN_MORE:0)); if ((flags & MSG_DONTWAIT) || netconn_is_nonblocking(sock->conn)) {
if ((size > TCP_SND_BUF) || ((size / TCP_MSS) > TCP_SND_QUEUELEN)) {
/* too much data to ever send nonblocking! */
sock_set_errno(sock, EMSGSIZE);
return -1;
}
}
write_flags = NETCONN_COPY |
((flags & MSG_MORE) ? NETCONN_MORE : 0) |
((flags & MSG_DONTWAIT) ? NETCONN_DONTBLOCK : 0);
err = netconn_write(sock->conn, data, size, write_flags);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_send(%d) err=%d size=%"SZT_F"\n", s, err, size)); LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_send(%d) err=%d size=%"SZT_F"\n", s, err, size));
sock_set_errno(sock, err_to_errno(err)); sock_set_errno(sock, err_to_errno(err));

View File

@ -51,14 +51,14 @@ extern "C" {
* the same byte order as in the corresponding pcb. * the same byte order as in the corresponding pcb.
*/ */
/* Flags for netconn_write */ /* Flags for netconn_write (u8_t) */
#define NETCONN_NOFLAG 0x00 #define NETCONN_NOFLAG 0x00
#define NETCONN_NOCOPY 0x00 /* Only for source code compatibility */ #define NETCONN_NOCOPY 0x00 /* Only for source code compatibility */
#define NETCONN_COPY 0x01 #define NETCONN_COPY 0x01
#define NETCONN_MORE 0x02 #define NETCONN_MORE 0x02
#define NETCONN_DONTBLOCK 0x04
/* Flags for struct netconn.flags (u8_t) */ /* Flags for struct netconn.flags (u8_t) */
/** TCP: when data passed to netconn_write doesn't fit into the send buffer, /** TCP: when data passed to netconn_write doesn't fit into the send buffer,
this temporarily stores whether to wake up the original application task this temporarily stores whether to wake up the original application task
if data couldn't be sent in the first try. */ if data couldn't be sent in the first try. */
@ -70,6 +70,9 @@ extern "C" {
/** If this is set, a TCP netconn must call netconn_recved() to update /** If this is set, a TCP netconn must call netconn_recved() to update
the TCP receive window (done automatically if not set). */ the TCP receive window (done automatically if not set). */
#define NETCONN_FLAG_NO_AUTO_RECVED 0x08 #define NETCONN_FLAG_NO_AUTO_RECVED 0x08
/** If a nonblocking write has been rejected before, poll_tcp needs to
check if the netconn is writable again */
#define NETCONN_FLAG_CHECK_WRITESPACE 0x10
/* Helpers to process several netconn_types by the same code */ /* Helpers to process several netconn_types by the same code */