Skip to content

Commit

Permalink
unix,win: add uv_udp_try_send2
Browse files Browse the repository at this point in the history
Add a version of uv_udp_try_send that can send multiple datagrams.

Uses sendmmsg(2) on platforms that support it (Linux, FreeBSD, macOS),
falls back to a regular sendmsg(2) loop elsewhere.

This work was sponsored by ISC, the Internet Systems Consortium.
  • Loading branch information
bnoordhuis committed Dec 13, 2024
1 parent 7b4cf04 commit e8969bf
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 6 deletions.
14 changes: 14 additions & 0 deletions docs/src/udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,20 @@ API
.. versionchanged:: 1.27.0 added support for connected sockets
.. c:function:: int uv_udp_try_send2(uv_udp_t* handle, unsigned int count, uv_buf_t* bufs[/*count*/], unsigned int nbufs[/*count*/], struct sockaddr* addrs[/*count*/], unsigned int flags)
Like :c:func:`uv_udp_try_send`, but can send multiple datagrams.
Lightweight abstraction around :man:`sendmmsg(2)`, with a :man:`sendmsg(2)`
fallback loop for platforms that do not support the former. The handle must
be fully initialized; call c:func:`uv_udp_bind` first.
:returns: >= 0: number of datagrams sent. Zero only if `count` was zero.
< 0: negative error code. Only if sending the first datagram fails,
otherwise returns a positive send count. ``UV_EAGAIN`` when datagrams
cannot be sent right now; fall back to :c:func:`uv_udp_send`.

.. versionadded:: 1.50.0

.. c:function:: int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, uv_udp_recv_cb recv_cb)
Prepare for receiving data. If the socket has not previously been bound
Expand Down
6 changes: 6 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,12 @@ UV_EXTERN int uv_udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr);
UV_EXTERN int uv_udp_try_send2(uv_udp_t* handle,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/],
unsigned int flags);
UV_EXTERN int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb);
Expand Down
15 changes: 15 additions & 0 deletions src/unix/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1401,3 +1401,18 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
feed:
uv__io_feed(handle->loop, &handle->io_watcher);
}


int uv__udp_try_send2(uv_udp_t* handle,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/]) {
int fd;

fd = handle->io_watcher.fd;
if (fd == -1)
return UV_EINVAL;

return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs);
}
19 changes: 19 additions & 0 deletions src/uv-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,25 @@ int uv_udp_try_send(uv_udp_t* handle,
}


int uv_udp_try_send2(uv_udp_t* handle,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/],
unsigned int flags) {
if (count < 1)
return UV_EINVAL;

if (flags != 0)
return UV_EINVAL;

if (handle->send_queue_count > 0)
return UV_EAGAIN;

return uv__udp_try_send2(handle, count, bufs, nbufs, addrs);
}


int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb) {
Expand Down
6 changes: 6 additions & 0 deletions src/uv-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ int uv__udp_try_send(uv_udp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen);

int uv__udp_try_send2(uv_udp_t* handle,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/]);

int uv__udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloccb,
uv_udp_recv_cb recv_cb);

Expand Down
18 changes: 18 additions & 0 deletions src/win/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1142,3 +1142,21 @@ int uv__udp_try_send(uv_udp_t* handle,

return bytes;
}


int uv__udp_try_send2(uv_udp_t* handle,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/]) {
unsigned int i;
int r;

for (i = 0; i < count; i++) {
r = uv_udp_try_send(handle, bufs[i], nbufs[i], addrs[i]);
if (r < 0)
return i > 0 ? i : r; /* Error if first packet, else send count. */
}

return i;
}
36 changes: 30 additions & 6 deletions test/test-udp-try-send.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ static void sv_recv_cb(uv_udp_t* handle,
const uv_buf_t* rcvbuf,
const struct sockaddr* addr,
unsigned flags) {
ASSERT_GT(nread, 0);

if (nread == 0) {
ASSERT_NULL(addr);
return;
Expand All @@ -70,11 +68,17 @@ static void sv_recv_cb(uv_udp_t* handle,
ASSERT_EQ(4, nread);
ASSERT_NOT_NULL(addr);

ASSERT_OK(memcmp("EXIT", rcvbuf->base, nread));
uv_close((uv_handle_t*) handle, close_cb);
uv_close((uv_handle_t*) &client, close_cb);
if (!memcmp("EXIT", rcvbuf->base, nread)) {
uv_close((uv_handle_t*) handle, close_cb);
uv_close((uv_handle_t*) &client, close_cb);
} else {
ASSERT_MEM_EQ(rcvbuf->base, "HELO", 4);
}

sv_recv_cb_called++;

if (sv_recv_cb_called == 2)
uv_udp_recv_stop(handle);
}


Expand Down Expand Up @@ -108,14 +112,34 @@ TEST_IMPL(udp_try_send) {
r = uv_udp_try_send(&client, &buf, 1, (const struct sockaddr*) &addr);
ASSERT_EQ(r, UV_EMSGSIZE);

uv_buf_t* bufs[] = {&buf, &buf};
unsigned int nbufs[] = {1, 1};
struct sockaddr* addrs[] = {
(struct sockaddr*) &addr,
(struct sockaddr*) &addr,
};

ASSERT_EQ(0, sv_recv_cb_called);

buf = uv_buf_init("HELO", 4);
r = uv_udp_try_send2(&client, 2, bufs, nbufs, addrs, /*flags*/0);
ASSERT_EQ(r, 2);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

ASSERT_EQ(2, sv_recv_cb_called);

r = uv_udp_recv_start(&server, alloc_cb, sv_recv_cb);
ASSERT_OK(r);

buf = uv_buf_init("EXIT", 4);
r = uv_udp_try_send(&client, &buf, 1, (const struct sockaddr*) &addr);
ASSERT_EQ(4, r);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

ASSERT_EQ(2, close_cb_called);
ASSERT_EQ(1, sv_recv_cb_called);
ASSERT_EQ(3, sv_recv_cb_called);

ASSERT_OK(client.send_queue_size);
ASSERT_OK(server.send_queue_size);
Expand Down

0 comments on commit e8969bf

Please sign in to comment.