From 82271edfb662ce5bc05b185af7eaf7fef1a3e268 Mon Sep 17 00:00:00 2001 From: czubix Date: Mon, 9 Sep 2024 02:18:25 +0200 Subject: [PATCH] Implement sock_sendto --- uvloop/loop.pxd | 1 + uvloop/loop.pyx | 90 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 01e39ae1..2ca7146c 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -184,6 +184,7 @@ cdef class Loop: cdef _sock_recv(self, fut, sock, n) cdef _sock_recv_into(self, fut, sock, buf) + cdef _sock_sendto(self, fut, sock, data, address) cdef _sock_sendall(self, fut, sock, data) cdef _sock_accept(self, fut, sock) diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index f9a5a239..46a49762 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -1001,6 +1001,54 @@ cdef class Loop: fut.set_result(data) self._remove_reader(sock) + cdef _sock_sendto(self, fut, sock, data, address): + cdef: + Handle handle + int n + + if UVLOOP_DEBUG: + if fut.cancelled(): + # Shouldn't happen with _SyncSocketWriterFuture. + raise RuntimeError( + f'_sock_sendto is called on a cancelled Future') + + if not self._has_writer(sock): + raise RuntimeError( + f'socket {sock!r} does not have a writer ' + f'in the _sock_sendto callback') + + try: + n = sock.sendto(data, address) + except (BlockingIOError, InterruptedError): + # Try next time. + return + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_writer(sock) + return + + self._remove_writer(sock) + + if n == len(data): + fut.set_result(None) + else: + if n: + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + + handle = new_MethodHandle3( + self, + "Loop._sock_sendto", + self._sock_sendto, + None, + self, + fut, sock, data) + + self._add_writer(sock, handle) + cdef _sock_sendall(self, fut, sock, data): cdef: Handle handle @@ -2644,7 +2692,47 @@ cdef class Loop: @cython.iterable_coroutine async def sock_sendto(self, sock, data, address): - raise NotImplementedError + cdef: + Handle handle + ssize_t n + + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + if not data: + return + + socket_inc_io_ref(sock) + try: + try: + n = sock.sendto(data, address) + except (BlockingIOError, InterruptedError): + pass + else: + if UVLOOP_DEBUG: + # This can be a partial success, i.e. only part + # of the data was sent + self._sock_try_write_total += 1 + + if n == len(data): + return + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + + fut = _SyncSocketWriterFuture(sock, self) + handle = new_MethodHandle3( + self, + "Loop._sock_sendto", + self._sock_sendto, + None, + self, + fut, sock, data) + + self._add_writer(sock, handle) + return await fut + finally: + socket_dec_io_ref(sock) @cython.iterable_coroutine async def connect_accepted_socket(self, protocol_factory, sock, *,