From 666a69d821f973114cc090dc9d069caefd3bb9ba Mon Sep 17 00:00:00 2001 From: Bo Date: Fri, 31 Jan 2025 01:46:00 +0200 Subject: [PATCH] logging + perform IO read events when closed + fix array double destroy + refinements --- fio-stl.h | 185 ++++++++++++++++++++++---------------- fio-stl/010 mem.h | 18 ++-- fio-stl/102 poll epoll.h | 7 +- fio-stl/102 poll kqueue.h | 8 +- fio-stl/201 array.h | 12 +-- fio-stl/400 io api.h | 5 +- fio-stl/401 io types.h | 58 +++++++----- fio-stl/402 io reactor.h | 52 ++++++----- fio-stl/420 pubsub.h | 7 +- fio-stl/439 http.h | 18 ++-- 10 files changed, 218 insertions(+), 152 deletions(-) diff --git a/fio-stl.h b/fio-stl.h index ef5b71d5..2be8cc85 100644 --- a/fio-stl.h +++ b/fio-stl.h @@ -14648,9 +14648,10 @@ FIO_SFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_state_cleanup)(void *ignr_) { FIO_LIST_NODE node; }; void *last_chunk = NULL; - FIO_LOG_WARNING("(" FIO_MACRO2STR( - FIO_NAME(FIO_MEMORY_NAME, - malloc)) ") blocks left after cleanup - memory leaks?"); + FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR(FIO_NAME( + FIO_MEMORY_NAME, + malloc)) ") blocks left after cleanup - memory leaks?", + fio_getpid()); FIO_LIST_EACH(struct t_s, node, &FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks, @@ -14658,11 +14659,12 @@ FIO_SFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_state_cleanup)(void *ignr_) { if (last_chunk == (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos)) continue; last_chunk = (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos); - FIO_LOG_WARNING( - "(" FIO_MACRO2STR(FIO_NAME(FIO_MEMORY_NAME, - malloc)) ") leaked block(s) for chunk %p", - (void *)pos, - last_chunk); + FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR( + FIO_NAME(FIO_MEMORY_NAME, + malloc)) ") leaked block(s) for chunk %p", + fio_getpid(), + (void *)pos, + last_chunk); } } @@ -16365,6 +16367,7 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) { return total; int active_count = epoll_wait(p->fds[0].fd, events, FIO_POLL_MAX_EVENTS, 0); if (active_count > 0) { + /* TODO! fix error handling*/ for (int i = 0; i < active_count; i++) { // errors are handled as disconnections (on_close) in the EPOLLIN queue // if no error, try an active event(s) @@ -16376,12 +16379,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) { active_count = epoll_wait(p->fds[1].fd, events, FIO_POLL_MAX_EVENTS, 0); if (active_count > 0) { for (int i = 0; i < active_count; i++) { + // holds an active event(s) + if (events[i].events & EPOLLIN) + p->settings.on_data(events[i].data.ptr); // errors are handled as disconnections (on_close), but only once... if (events[i].events & (~(EPOLLIN | EPOLLOUT))) p->settings.on_close(events[i].data.ptr); - // no error, then it's an active event(s) - else if (events[i].events & EPOLLIN) - p->settings.on_data(events[i].data.ptr); } // end for loop total += active_count; } @@ -16535,14 +16538,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout_) { if (active_count > 0) { for (int i = 0; i < active_count; i++) { // test for event(s) type - if (events[i].filter == EVFILT_WRITE) { + if ((events[i].filter & EVFILT_WRITE)) p->settings.on_ready(events[i].udata); - } else if (events[i].filter == EVFILT_READ) { + if ((events[i].filter & EVFILT_READ)) p->settings.on_data(events[i].udata); - } - if (events[i].flags & (EV_EOF | EV_ERROR)) { + if (events[i].flags & (EV_EOF | EV_ERROR)) p->settings.on_close(events[i].udata); - } } } else if (active_count < 0) { if (errno == EINTR) @@ -26686,13 +26687,15 @@ SFUNC void FIO_NAME(FIO_ARRAY_NAME, destroy)(FIO_ARRAY_PTR ary_) { switch ( FIO_NAME_BL(FIO_ARRAY_NAME, embedded)((FIO_ARRAY_PTR)FIO_PTR_TAG(&tmp))) { case 0: + if (tmp.a.ary) { #if !FIO_ARRAY_TYPE_DESTROY_SIMPLE - for (size_t i = tmp.a.start; i < tmp.a.end; ++i) { - FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]); - } + for (size_t i = tmp.a.start; i < tmp.a.end; ++i) { + FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]); + } #endif - FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy)); - FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary)); + FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy)); + FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary)); + } return; case 1: #if !FIO_ARRAY_TYPE_DESTROY_SIMPLE @@ -34438,7 +34441,7 @@ developer. #ifndef FIO_IO_SHUTDOWN_TIMEOUT /* Sets the hard timeout (in milliseconds) for the reactor's shutdown loop. */ -#define FIO_IO_SHUTDOWN_TIMEOUT 10000 +#define FIO_IO_SHUTDOWN_TIMEOUT 15000 #endif #ifndef FIO_IO_COUNT_STORAGE @@ -34783,6 +34786,9 @@ SFUNC int fio_io_is_open(fio_io_s *io); /** Returns the approximate number of bytes in the outgoing buffer. */ SFUNC size_t fio_io_backlog(fio_io_s *io); +/** Does nothing. */ +SFUNC void fio_io_noop(fio_io_s *io); + /* ***************************************************************************** Task Scheduling ***************************************************************************** */ @@ -35338,15 +35344,13 @@ FIO_IFUNC void fio___io_env_safe_destroy(fio___io_env_safe_s *e) { Protocol Type Initialization ***************************************************************************** */ -static void fio___io_on_ev_mock_sus(fio_io_s *io) { fio_io_suspend(io); } -static void fio___io_on_ev_mock_unsus(fio_io_s *io) { fio_io_unsuspend(io); } -static void fio___io_on_ev_mock(fio_io_s *io) { (void)(io); } +SFUNC void fio_io_noop(fio_io_s *io) { (void)(io); } + static void fio___io_on_ev_pubsub_mock(struct fio_msg_s *msg) { (void)(msg); } static void fio___io_on_user_mock(fio_io_s *io, void *i_) { (void)io, (void)i_; } static void fio___io_on_close_mock(void *p1, void *p2) { (void)p1, (void)p2; } -static void fio___io_on_ev_on_timeout(fio_io_s *io) { fio_io_close_now(io); } /* Called to perform a non-blocking `read`, same as the system call. */ static ssize_t fio___io_func_default_read(int fd, @@ -35423,7 +35427,7 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) { fio_io_functions_s io_fn = { .build_context = fio___io_func_default_build_context, .free_context = fio___io_func_default_free_context, - .start = fio___io_on_ev_mock, + .start = fio_io_noop, .read = fio___io_func_default_read, .write = fio___io_func_default_write, .flush = fio___io_func_default_flush, @@ -35433,17 +35437,17 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) { if (has_tls) io_fn = fio_io_tls_default_functions(NULL); if (!pr->on_attach) - pr->on_attach = fio___io_on_ev_mock; + pr->on_attach = fio_io_noop; if (!pr->on_data) - pr->on_data = fio___io_on_ev_mock_sus; + pr->on_data = fio_io_suspend; if (!pr->on_ready) - pr->on_ready = fio___io_on_ev_mock; + pr->on_ready = fio_io_noop; if (!pr->on_close) pr->on_close = fio___io_on_close_mock; if (!pr->on_shutdown) - pr->on_shutdown = fio___io_on_ev_mock_unsus; + pr->on_shutdown = fio_io_noop; if (!pr->on_timeout) - pr->on_timeout = fio___io_on_ev_on_timeout; + pr->on_timeout = fio_io_close_now; if (!pr->on_pubsub) pr->on_pubsub = fio___io_on_ev_pubsub_mock; if (!pr->on_user1) @@ -35588,14 +35592,15 @@ IO Type #define FIO___IO_FLAG_CLOSE ((uint32_t)8U) #define FIO___IO_FLAG_CLOSE_REMOTE ((uint32_t)16U) #define FIO___IO_FLAG_CLOSE_ERROR ((uint32_t)32U) -#define FIO___IO_FLAG_TOUCH ((uint32_t)64U) -#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U) -#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U) -#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U) +#define FIO___IO_FLAG_CLOSED_ALL \ + (FIO___IO_FLAG_CLOSE | FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR) +#define FIO___IO_FLAG_TOUCH ((uint32_t)64U) +#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U) +#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U) +#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U) #define FIO___IO_FLAG_PREVENT_ON_DATA \ - (FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED | FIO___IO_FLAG_CLOSE | \ - FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR) + (FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED) #define FIO___IO_FLAG_POLL_SET \ (FIO___IO_FLAG_POLLIN_SET | FIO___IO_FLAG_POLLOUT_SET) @@ -35630,7 +35635,7 @@ FIO_IFUNC void fio___io_monitor_in(fio_io_s *io) { FIO_LOG_DDEBUG2("(%d) IO monitoring Input for %d (called)", fio_io_pid(), io->fd); - if (io->flags & FIO___IO_FLAG_PREVENT_ON_DATA) + if (io->flags & (FIO___IO_FLAG_PREVENT_ON_DATA | FIO___IO_FLAG_CLOSED_ALL)) return; if ((FIO___IO_FLAG_SET(io, FIO___IO_FLAG_POLLIN_SET) & FIO___IO_FLAG_POLLIN_SET)) { @@ -35750,6 +35755,7 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, void *udata, void *tls) { fio_io_s *io = NULL; + fio_io_protocol_s cpy; if (fd == -1) goto error; io = fio___io_new2(pr->buffer_size); @@ -35772,8 +35778,9 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, return io; error: - pr->on_close(NULL, udata); - pr->io_functions.cleanup(tls); + cpy = *pr; + cpy.on_close(NULL, udata); + cpy.io_functions.cleanup(tls); return io; } @@ -35881,6 +35888,9 @@ FIO_SFUNC void fio___io_write2(void *io_, void *packet_) { return; io_closed: + FIO_LOG_DEBUG2("(%d) write task to closed IO %d failed (task too late).", + fio_io_pid(), + fio_io_fd(io)); fio_stream_packet_free(packet); fio___io_free2(io); } @@ -36074,7 +36084,7 @@ static void fio___io_poll_on_data(void *io_, void *ignr_) { fio_io_s *io = (fio_io_s *)io_; FIO___IO_FLAG_UNSET(io, FIO___IO_FLAG_POLLIN_SET); if (!(io->flags & FIO___IO_FLAG_PREVENT_ON_DATA)) { - /* this also tests for the suspended / throttled / closing flags */ + /* this also tests for the suspended / throttled flags, allows closed */ io->pr->on_data(io); fio___io_monitor_in(io); } else if ((io->flags & FIO___IO_FLAG_OPEN)) { @@ -36174,6 +36184,13 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) { fio___io_free2(io); } +// static void fio___io_poll_on_close_task(void *io_, void *ignr_) { +// (void)ignr_; +// fio_io_s *io = (fio_io_s *)io_; +// fio_io_close_now(io); +// fio___io_free2(io); +// } + static void fio___io_poll_on_close(void *io_, void *ignr_) { (void)ignr_; fio_io_s *io = (fio_io_s *)io_; @@ -36181,6 +36198,7 @@ static void fio___io_poll_on_close(void *io_, void *ignr_) { FIO___IO_FLAG_SET(io, FIO___IO_FLAG_CLOSE_REMOTE); FIO_LOG_DEBUG2("(%d) fd %d closed by remote peer", FIO___IO.pid, io->fd); } + /* allow on_data tasks to complete before closing? */ fio_io_close_now(io); fio___io_free2(io); } @@ -36560,7 +36578,7 @@ SFUNC fio_io_tls_s *fio_io_tls_alpn_add(fio_io_tls_s *t, if (!t || !protocol_name) return t; if (!on_selected) - on_selected = fio___io_on_ev_mock; + on_selected = fio_io_noop; size_t pr_name_len = strlen(protocol_name); if (pr_name_len > 255) { FIO_LOG_ERROR( @@ -36710,7 +36728,7 @@ SFUNC int fio_io_tls_each FIO_NOOP(fio_io_tls_each_s a) { SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) { static fio_io_functions_s default_io_functions = { .build_context = fio___io_func_default_build_context, - .start = fio___io_on_ev_mock, + .start = fio_io_noop, .read = fio___io_func_default_read, .write = fio___io_func_default_write, .flush = fio___io_func_default_flush, @@ -36722,7 +36740,7 @@ SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) { if (!f->build_context) f->build_context = fio___io_func_default_build_context; if (!f->start) - f->start = fio___io_on_ev_mock; + f->start = fio_io_noop; if (!f->read) f->read = fio___io_func_default_read; if (!f->write) @@ -36882,6 +36900,7 @@ The IO Reactor Cycle (the actual work) ***************************************************************************** */ static void fio___io_signal_stop(int sig, void *flg) { + FIO_LOG_INFO("(%d) stop signal detected.", FIO___IO.pid); fio_io_stop(); (void)sig, (void)flg; } @@ -36889,8 +36908,10 @@ static void fio___io_signal_stop(int sig, void *flg) { static void fio___io_signal_restart(int sig, void *flg) { if (fio_io_is_master()) fio_io_restart(FIO___IO.workers); - else + else { + FIO_LOG_INFO("(%d) restart signal detected.", FIO___IO.pid); fio_io_stop(); + } (void)sig, (void)flg; } @@ -37070,7 +37091,9 @@ static void *fio___io_worker_sentinel(void *pid_data) { FIO___LOCK_UNLOCK(FIO___IO.lock); if (!WIFEXITED(status) || WEXITSTATUS(status)) { - FIO_LOG_WARNING("(%d) abnormal worker exit detected", FIO___IO.pid); + FIO_LOG_WARNING("(%d) abnormal worker exit detected for %d", + FIO___IO.pid, + sentinal.pid); fio_state_callback_force(FIO_CALL_ON_CHILD_CRUSH); } if (!FIO___IO.stop && !sentinal.stop) { @@ -37102,26 +37125,10 @@ static void fio___io_spawn_worker(void) { if (FIO___IO.stop || !fio_io_is_master()) return; - /* do not allow master tasks to run in worker - pretend to stop. */ - FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); - if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2) - return; - FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { - fio___io_async_stop(q); - } - fio_queue_perform_all(&FIO___IO.queue); - /* perform forking procedure with the stop flag reset. */ - fio_atomic_and_fetch(&FIO___IO.stop, 1); - fio_state_callback_force(FIO_CALL_BEFORE_FORK); - FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); - /* perform actual fork */ fio_thread_pid_t pid = fio_thread_fork(); FIO_ASSERT(pid != (fio_thread_pid_t)-1, "system call `fork` failed."); if (!pid) goto is_worker_process; - /* finish up */ - fio_state_callback_force(FIO_CALL_AFTER_FORK); - fio_state_callback_force(FIO_CALL_IN_MASTER); if (fio_thread_create(&t, fio___io_worker_sentinel, (void *)(uintptr_t)pid)) { FIO_LOG_FATAL( "sentinel thread creation failed, no worker will be spawned."); @@ -37171,10 +37178,29 @@ static void fio___io_spawn_workers_task(void *ignr_1, void *ignr_2) { if (fio_atomic_or(&is_running, 1)) return; FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_pid(), FIO___IO.to_spawn); + + /* do not allow master tasks to run in worker - pretend to stop. */ + FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); + if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2) + return; + FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { + fio___io_async_stop(q); + } + fio_queue_perform_all(&FIO___IO.queue); + + /* perform forking procedure with the stop flag reset. */ + fio_atomic_and_fetch(&FIO___IO.stop, 1); + fio_state_callback_force(FIO_CALL_BEFORE_FORK); + FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); + + /* perform actual fork */ do { fio___io_spawn_worker(); } while (fio_atomic_sub_fetch(&FIO___IO.to_spawn, 1)); + /* finish up */ + fio_state_callback_force(FIO_CALL_AFTER_FORK); + fio_state_callback_force(FIO_CALL_IN_MASTER); if ((FIO___IO.flags & FIO___IO_FLAG_CYCLING)) { fio___io_defer_no_wakeup(fio___io_work_task, NULL, NULL); FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { @@ -37222,10 +37248,8 @@ SFUNC void fio_io_start(int workers) { #endif FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); if (workers) { - FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_root_pid(), workers); - for (int i = 0; i < workers; ++i) { - fio___io_spawn_worker(); - } + FIO___IO.to_spawn = workers; + fio___io_spawn_workers_task(NULL, NULL); } else { FIO_LOG_DEBUG2("(%d) starting facil.io IO reactor in single process mode.", fio_io_root_pid()); @@ -37289,7 +37313,7 @@ SFUNC void fio___io_restart(void *workers_, void *ignr_) { FIO___LOCK_LOCK(FIO___IO.lock); FIO_LIST_EACH(fio___io_pid_s, node, &FIO___IO.pids, w) { w->stop = 1; - fio_thread_kill(w->pid, SIGTERM); + fio_thread_kill(w->pid, SIGUSR1); } FIO___LOCK_UNLOCK(FIO___IO.lock); /* switch to single mode? */ @@ -39243,7 +39267,7 @@ FIO_SFUNC void fio___pubsub_on_enter_child(void *ignr_) { FIO___PUBSUB_POSTOFFICE.protocol.ipc.on_data = fio___pubsub_protocol_on_data_worker; - FIO___PUBSUB_POSTOFFICE.crush_on_close = 1; + FIO___PUBSUB_POSTOFFICE.crush_on_close = !fio_io_is_master(); FIO___PUBSUB_POSTOFFICE.filter.publish = FIO___PUBSUB_PROCESS; FIO___PUBSUB_POSTOFFICE.filter.local = @@ -40105,8 +40129,11 @@ FIO_SFUNC void fio___pubsub_protocol_on_close(void *p_, void *udata) { &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } fio___pubsub_message_parser_destroy(p); - if (FIO___PUBSUB_POSTOFFICE.crush_on_close) + if (FIO___PUBSUB_POSTOFFICE.crush_on_close) { + if (fio_io_is_running()) + FIO_LOG_FATAL("(%d) pub/sub connection lost unexpectedly.", fio_io_pid()); fio_io_stop(); + } (void)udata; } @@ -46073,8 +46100,14 @@ FIO_SFUNC void fio___http_on_http_with_public_folder(void *h_, void *ignr) { FIO_SFUNC void fio___http_perform_user_callback_client(void *cb_, void *h_) { fio_http_s *h = (fio_http_s *)h_; + union { + void (*fn)(fio_http_s *); + void *ptr; + } cb = {.ptr = cb_}; fio___http_connection_s *c = (fio___http_connection_s *)fio_http_cdata(h); - fio___http_perform_user_callback(cb_, h_); + /* unlike Server mode, handle responses from closed connections */ + cb.fn(h); + fio_http_free(h); fio_io_free(c->io); } @@ -46788,11 +46821,11 @@ FIO_SFUNC void fio___http_controller_http1_send_headers(fio_http_s *h) { /* send data (move memory ownership)? */ c->state.http.buf = buf; return; - fio_io_write2(c->io, - .buf = buf.buf, - .len = buf.len, - .dealloc = FIO_STRING_FREE, - .copy = 0); + // fio_io_write2(c->io, + // .buf = buf.buf, + // .len = buf.len, + // .dealloc = FIO_STRING_FREE, + // .copy = 0); } /** called by the HTTP handle for each body chunk (or to finish a response. */ FIO_SFUNC void fio___http_controller_http1_write_body( diff --git a/fio-stl/010 mem.h b/fio-stl/010 mem.h index 20c25fdc..48fe45e0 100644 --- a/fio-stl/010 mem.h +++ b/fio-stl/010 mem.h @@ -1250,9 +1250,10 @@ FIO_SFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_state_cleanup)(void *ignr_) { FIO_LIST_NODE node; }; void *last_chunk = NULL; - FIO_LOG_WARNING("(" FIO_MACRO2STR( - FIO_NAME(FIO_MEMORY_NAME, - malloc)) ") blocks left after cleanup - memory leaks?"); + FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR(FIO_NAME( + FIO_MEMORY_NAME, + malloc)) ") blocks left after cleanup - memory leaks?", + fio_getpid()); FIO_LIST_EACH(struct t_s, node, &FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks, @@ -1260,11 +1261,12 @@ FIO_SFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_state_cleanup)(void *ignr_) { if (last_chunk == (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos)) continue; last_chunk = (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos); - FIO_LOG_WARNING( - "(" FIO_MACRO2STR(FIO_NAME(FIO_MEMORY_NAME, - malloc)) ") leaked block(s) for chunk %p", - (void *)pos, - last_chunk); + FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR( + FIO_NAME(FIO_MEMORY_NAME, + malloc)) ") leaked block(s) for chunk %p", + fio_getpid(), + (void *)pos, + last_chunk); } } diff --git a/fio-stl/102 poll epoll.h b/fio-stl/102 poll epoll.h index 8ee1ca0f..c7b12320 100644 --- a/fio-stl/102 poll epoll.h +++ b/fio-stl/102 poll epoll.h @@ -159,6 +159,7 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) { return total; int active_count = epoll_wait(p->fds[0].fd, events, FIO_POLL_MAX_EVENTS, 0); if (active_count > 0) { + /* TODO! fix error handling*/ for (int i = 0; i < active_count; i++) { // errors are handled as disconnections (on_close) in the EPOLLIN queue // if no error, try an active event(s) @@ -170,12 +171,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) { active_count = epoll_wait(p->fds[1].fd, events, FIO_POLL_MAX_EVENTS, 0); if (active_count > 0) { for (int i = 0; i < active_count; i++) { + // holds an active event(s) + if (events[i].events & EPOLLIN) + p->settings.on_data(events[i].data.ptr); // errors are handled as disconnections (on_close), but only once... if (events[i].events & (~(EPOLLIN | EPOLLOUT))) p->settings.on_close(events[i].data.ptr); - // no error, then it's an active event(s) - else if (events[i].events & EPOLLIN) - p->settings.on_data(events[i].data.ptr); } // end for loop total += active_count; } diff --git a/fio-stl/102 poll kqueue.h b/fio-stl/102 poll kqueue.h index 65c73d41..12a800c2 100644 --- a/fio-stl/102 poll kqueue.h +++ b/fio-stl/102 poll kqueue.h @@ -140,14 +140,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout_) { if (active_count > 0) { for (int i = 0; i < active_count; i++) { // test for event(s) type - if (events[i].filter == EVFILT_WRITE) { + if ((events[i].filter & EVFILT_WRITE)) p->settings.on_ready(events[i].udata); - } else if (events[i].filter == EVFILT_READ) { + if ((events[i].filter & EVFILT_READ)) p->settings.on_data(events[i].udata); - } - if (events[i].flags & (EV_EOF | EV_ERROR)) { + if (events[i].flags & (EV_EOF | EV_ERROR)) p->settings.on_close(events[i].udata); - } } } else if (active_count < 0) { if (errno == EINTR) diff --git a/fio-stl/201 array.h b/fio-stl/201 array.h index ddd6066a..db8d23e3 100644 --- a/fio-stl/201 array.h +++ b/fio-stl/201 array.h @@ -637,13 +637,15 @@ SFUNC void FIO_NAME(FIO_ARRAY_NAME, destroy)(FIO_ARRAY_PTR ary_) { switch ( FIO_NAME_BL(FIO_ARRAY_NAME, embedded)((FIO_ARRAY_PTR)FIO_PTR_TAG(&tmp))) { case 0: + if (tmp.a.ary) { #if !FIO_ARRAY_TYPE_DESTROY_SIMPLE - for (size_t i = tmp.a.start; i < tmp.a.end; ++i) { - FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]); - } + for (size_t i = tmp.a.start; i < tmp.a.end; ++i) { + FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]); + } #endif - FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy)); - FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary)); + FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy)); + FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary)); + } return; case 1: #if !FIO_ARRAY_TYPE_DESTROY_SIMPLE diff --git a/fio-stl/400 io api.h b/fio-stl/400 io api.h index d9caaedf..cf51eb76 100644 --- a/fio-stl/400 io api.h +++ b/fio-stl/400 io api.h @@ -42,7 +42,7 @@ developer. #ifndef FIO_IO_SHUTDOWN_TIMEOUT /* Sets the hard timeout (in milliseconds) for the reactor's shutdown loop. */ -#define FIO_IO_SHUTDOWN_TIMEOUT 10000 +#define FIO_IO_SHUTDOWN_TIMEOUT 15000 #endif #ifndef FIO_IO_COUNT_STORAGE @@ -387,6 +387,9 @@ SFUNC int fio_io_is_open(fio_io_s *io); /** Returns the approximate number of bytes in the outgoing buffer. */ SFUNC size_t fio_io_backlog(fio_io_s *io); +/** Does nothing. */ +SFUNC void fio_io_noop(fio_io_s *io); + /* ***************************************************************************** Task Scheduling ***************************************************************************** */ diff --git a/fio-stl/401 io types.h b/fio-stl/401 io types.h index 78013e4c..adf588d3 100644 --- a/fio-stl/401 io types.h +++ b/fio-stl/401 io types.h @@ -114,15 +114,13 @@ FIO_IFUNC void fio___io_env_safe_destroy(fio___io_env_safe_s *e) { Protocol Type Initialization ***************************************************************************** */ -static void fio___io_on_ev_mock_sus(fio_io_s *io) { fio_io_suspend(io); } -static void fio___io_on_ev_mock_unsus(fio_io_s *io) { fio_io_unsuspend(io); } -static void fio___io_on_ev_mock(fio_io_s *io) { (void)(io); } +SFUNC void fio_io_noop(fio_io_s *io) { (void)(io); } + static void fio___io_on_ev_pubsub_mock(struct fio_msg_s *msg) { (void)(msg); } static void fio___io_on_user_mock(fio_io_s *io, void *i_) { (void)io, (void)i_; } static void fio___io_on_close_mock(void *p1, void *p2) { (void)p1, (void)p2; } -static void fio___io_on_ev_on_timeout(fio_io_s *io) { fio_io_close_now(io); } /* Called to perform a non-blocking `read`, same as the system call. */ static ssize_t fio___io_func_default_read(int fd, @@ -199,7 +197,7 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) { fio_io_functions_s io_fn = { .build_context = fio___io_func_default_build_context, .free_context = fio___io_func_default_free_context, - .start = fio___io_on_ev_mock, + .start = fio_io_noop, .read = fio___io_func_default_read, .write = fio___io_func_default_write, .flush = fio___io_func_default_flush, @@ -209,17 +207,17 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) { if (has_tls) io_fn = fio_io_tls_default_functions(NULL); if (!pr->on_attach) - pr->on_attach = fio___io_on_ev_mock; + pr->on_attach = fio_io_noop; if (!pr->on_data) - pr->on_data = fio___io_on_ev_mock_sus; + pr->on_data = fio_io_suspend; if (!pr->on_ready) - pr->on_ready = fio___io_on_ev_mock; + pr->on_ready = fio_io_noop; if (!pr->on_close) pr->on_close = fio___io_on_close_mock; if (!pr->on_shutdown) - pr->on_shutdown = fio___io_on_ev_mock_unsus; + pr->on_shutdown = fio_io_noop; if (!pr->on_timeout) - pr->on_timeout = fio___io_on_ev_on_timeout; + pr->on_timeout = fio_io_close_now; if (!pr->on_pubsub) pr->on_pubsub = fio___io_on_ev_pubsub_mock; if (!pr->on_user1) @@ -364,14 +362,15 @@ IO Type #define FIO___IO_FLAG_CLOSE ((uint32_t)8U) #define FIO___IO_FLAG_CLOSE_REMOTE ((uint32_t)16U) #define FIO___IO_FLAG_CLOSE_ERROR ((uint32_t)32U) -#define FIO___IO_FLAG_TOUCH ((uint32_t)64U) -#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U) -#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U) -#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U) +#define FIO___IO_FLAG_CLOSED_ALL \ + (FIO___IO_FLAG_CLOSE | FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR) +#define FIO___IO_FLAG_TOUCH ((uint32_t)64U) +#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U) +#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U) +#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U) #define FIO___IO_FLAG_PREVENT_ON_DATA \ - (FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED | FIO___IO_FLAG_CLOSE | \ - FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR) + (FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED) #define FIO___IO_FLAG_POLL_SET \ (FIO___IO_FLAG_POLLIN_SET | FIO___IO_FLAG_POLLOUT_SET) @@ -406,7 +405,7 @@ FIO_IFUNC void fio___io_monitor_in(fio_io_s *io) { FIO_LOG_DDEBUG2("(%d) IO monitoring Input for %d (called)", fio_io_pid(), io->fd); - if (io->flags & FIO___IO_FLAG_PREVENT_ON_DATA) + if (io->flags & (FIO___IO_FLAG_PREVENT_ON_DATA | FIO___IO_FLAG_CLOSED_ALL)) return; if ((FIO___IO_FLAG_SET(io, FIO___IO_FLAG_POLLIN_SET) & FIO___IO_FLAG_POLLIN_SET)) { @@ -526,6 +525,7 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, void *udata, void *tls) { fio_io_s *io = NULL; + fio_io_protocol_s cpy; if (fd == -1) goto error; io = fio___io_new2(pr->buffer_size); @@ -548,8 +548,9 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, return io; error: - pr->on_close(NULL, udata); - pr->io_functions.cleanup(tls); + cpy = *pr; + cpy.on_close(NULL, udata); + cpy.io_functions.cleanup(tls); return io; } @@ -657,6 +658,9 @@ FIO_SFUNC void fio___io_write2(void *io_, void *packet_) { return; io_closed: + FIO_LOG_DEBUG2("(%d) write task to closed IO %d failed (task too late).", + fio_io_pid(), + fio_io_fd(io)); fio_stream_packet_free(packet); fio___io_free2(io); } @@ -850,7 +854,7 @@ static void fio___io_poll_on_data(void *io_, void *ignr_) { fio_io_s *io = (fio_io_s *)io_; FIO___IO_FLAG_UNSET(io, FIO___IO_FLAG_POLLIN_SET); if (!(io->flags & FIO___IO_FLAG_PREVENT_ON_DATA)) { - /* this also tests for the suspended / throttled / closing flags */ + /* this also tests for the suspended / throttled flags, allows closed */ io->pr->on_data(io); fio___io_monitor_in(io); } else if ((io->flags & FIO___IO_FLAG_OPEN)) { @@ -950,6 +954,13 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) { fio___io_free2(io); } +// static void fio___io_poll_on_close_task(void *io_, void *ignr_) { +// (void)ignr_; +// fio_io_s *io = (fio_io_s *)io_; +// fio_io_close_now(io); +// fio___io_free2(io); +// } + static void fio___io_poll_on_close(void *io_, void *ignr_) { (void)ignr_; fio_io_s *io = (fio_io_s *)io_; @@ -957,6 +968,7 @@ static void fio___io_poll_on_close(void *io_, void *ignr_) { FIO___IO_FLAG_SET(io, FIO___IO_FLAG_CLOSE_REMOTE); FIO_LOG_DEBUG2("(%d) fd %d closed by remote peer", FIO___IO.pid, io->fd); } + /* allow on_data tasks to complete before closing? */ fio_io_close_now(io); fio___io_free2(io); } @@ -1336,7 +1348,7 @@ SFUNC fio_io_tls_s *fio_io_tls_alpn_add(fio_io_tls_s *t, if (!t || !protocol_name) return t; if (!on_selected) - on_selected = fio___io_on_ev_mock; + on_selected = fio_io_noop; size_t pr_name_len = strlen(protocol_name); if (pr_name_len > 255) { FIO_LOG_ERROR( @@ -1486,7 +1498,7 @@ SFUNC int fio_io_tls_each FIO_NOOP(fio_io_tls_each_s a) { SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) { static fio_io_functions_s default_io_functions = { .build_context = fio___io_func_default_build_context, - .start = fio___io_on_ev_mock, + .start = fio_io_noop, .read = fio___io_func_default_read, .write = fio___io_func_default_write, .flush = fio___io_func_default_flush, @@ -1498,7 +1510,7 @@ SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) { if (!f->build_context) f->build_context = fio___io_func_default_build_context; if (!f->start) - f->start = fio___io_on_ev_mock; + f->start = fio_io_noop; if (!f->read) f->read = fio___io_func_default_read; if (!f->write) diff --git a/fio-stl/402 io reactor.h b/fio-stl/402 io reactor.h index 717021f0..66101f67 100644 --- a/fio-stl/402 io reactor.h +++ b/fio-stl/402 io reactor.h @@ -20,6 +20,7 @@ The IO Reactor Cycle (the actual work) ***************************************************************************** */ static void fio___io_signal_stop(int sig, void *flg) { + FIO_LOG_INFO("(%d) stop signal detected.", FIO___IO.pid); fio_io_stop(); (void)sig, (void)flg; } @@ -27,8 +28,10 @@ static void fio___io_signal_stop(int sig, void *flg) { static void fio___io_signal_restart(int sig, void *flg) { if (fio_io_is_master()) fio_io_restart(FIO___IO.workers); - else + else { + FIO_LOG_INFO("(%d) restart signal detected.", FIO___IO.pid); fio_io_stop(); + } (void)sig, (void)flg; } @@ -208,7 +211,9 @@ static void *fio___io_worker_sentinel(void *pid_data) { FIO___LOCK_UNLOCK(FIO___IO.lock); if (!WIFEXITED(status) || WEXITSTATUS(status)) { - FIO_LOG_WARNING("(%d) abnormal worker exit detected", FIO___IO.pid); + FIO_LOG_WARNING("(%d) abnormal worker exit detected for %d", + FIO___IO.pid, + sentinal.pid); fio_state_callback_force(FIO_CALL_ON_CHILD_CRUSH); } if (!FIO___IO.stop && !sentinal.stop) { @@ -240,26 +245,10 @@ static void fio___io_spawn_worker(void) { if (FIO___IO.stop || !fio_io_is_master()) return; - /* do not allow master tasks to run in worker - pretend to stop. */ - FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); - if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2) - return; - FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { - fio___io_async_stop(q); - } - fio_queue_perform_all(&FIO___IO.queue); - /* perform forking procedure with the stop flag reset. */ - fio_atomic_and_fetch(&FIO___IO.stop, 1); - fio_state_callback_force(FIO_CALL_BEFORE_FORK); - FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); - /* perform actual fork */ fio_thread_pid_t pid = fio_thread_fork(); FIO_ASSERT(pid != (fio_thread_pid_t)-1, "system call `fork` failed."); if (!pid) goto is_worker_process; - /* finish up */ - fio_state_callback_force(FIO_CALL_AFTER_FORK); - fio_state_callback_force(FIO_CALL_IN_MASTER); if (fio_thread_create(&t, fio___io_worker_sentinel, (void *)(uintptr_t)pid)) { FIO_LOG_FATAL( "sentinel thread creation failed, no worker will be spawned."); @@ -309,10 +298,29 @@ static void fio___io_spawn_workers_task(void *ignr_1, void *ignr_2) { if (fio_atomic_or(&is_running, 1)) return; FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_pid(), FIO___IO.to_spawn); + + /* do not allow master tasks to run in worker - pretend to stop. */ + FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); + if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2) + return; + FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { + fio___io_async_stop(q); + } + fio_queue_perform_all(&FIO___IO.queue); + + /* perform forking procedure with the stop flag reset. */ + fio_atomic_and_fetch(&FIO___IO.stop, 1); + fio_state_callback_force(FIO_CALL_BEFORE_FORK); + FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); + + /* perform actual fork */ do { fio___io_spawn_worker(); } while (fio_atomic_sub_fetch(&FIO___IO.to_spawn, 1)); + /* finish up */ + fio_state_callback_force(FIO_CALL_AFTER_FORK); + fio_state_callback_force(FIO_CALL_IN_MASTER); if ((FIO___IO.flags & FIO___IO_FLAG_CYCLING)) { fio___io_defer_no_wakeup(fio___io_work_task, NULL, NULL); FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) { @@ -360,10 +368,8 @@ SFUNC void fio_io_start(int workers) { #endif FIO___IO.tick = FIO___IO_GET_TIME_MILLI(); if (workers) { - FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_root_pid(), workers); - for (int i = 0; i < workers; ++i) { - fio___io_spawn_worker(); - } + FIO___IO.to_spawn = workers; + fio___io_spawn_workers_task(NULL, NULL); } else { FIO_LOG_DEBUG2("(%d) starting facil.io IO reactor in single process mode.", fio_io_root_pid()); @@ -427,7 +433,7 @@ SFUNC void fio___io_restart(void *workers_, void *ignr_) { FIO___LOCK_LOCK(FIO___IO.lock); FIO_LIST_EACH(fio___io_pid_s, node, &FIO___IO.pids, w) { w->stop = 1; - fio_thread_kill(w->pid, SIGTERM); + fio_thread_kill(w->pid, SIGUSR1); } FIO___LOCK_UNLOCK(FIO___IO.lock); /* switch to single mode? */ diff --git a/fio-stl/420 pubsub.h b/fio-stl/420 pubsub.h index 59150d4a..31d57fcd 100644 --- a/fio-stl/420 pubsub.h +++ b/fio-stl/420 pubsub.h @@ -1016,7 +1016,7 @@ FIO_SFUNC void fio___pubsub_on_enter_child(void *ignr_) { FIO___PUBSUB_POSTOFFICE.protocol.ipc.on_data = fio___pubsub_protocol_on_data_worker; - FIO___PUBSUB_POSTOFFICE.crush_on_close = 1; + FIO___PUBSUB_POSTOFFICE.crush_on_close = !fio_io_is_master(); FIO___PUBSUB_POSTOFFICE.filter.publish = FIO___PUBSUB_PROCESS; FIO___PUBSUB_POSTOFFICE.filter.local = @@ -1878,8 +1878,11 @@ FIO_SFUNC void fio___pubsub_protocol_on_close(void *p_, void *udata) { &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } fio___pubsub_message_parser_destroy(p); - if (FIO___PUBSUB_POSTOFFICE.crush_on_close) + if (FIO___PUBSUB_POSTOFFICE.crush_on_close) { + if (fio_io_is_running()) + FIO_LOG_FATAL("(%d) pub/sub connection lost unexpectedly.", fio_io_pid()); fio_io_stop(); + } (void)udata; } diff --git a/fio-stl/439 http.h b/fio-stl/439 http.h index 19c86c6c..15adee14 100644 --- a/fio-stl/439 http.h +++ b/fio-stl/439 http.h @@ -693,8 +693,14 @@ FIO_SFUNC void fio___http_on_http_with_public_folder(void *h_, void *ignr) { FIO_SFUNC void fio___http_perform_user_callback_client(void *cb_, void *h_) { fio_http_s *h = (fio_http_s *)h_; + union { + void (*fn)(fio_http_s *); + void *ptr; + } cb = {.ptr = cb_}; fio___http_connection_s *c = (fio___http_connection_s *)fio_http_cdata(h); - fio___http_perform_user_callback(cb_, h_); + /* unlike Server mode, handle responses from closed connections */ + cb.fn(h); + fio_http_free(h); fio_io_free(c->io); } @@ -1408,11 +1414,11 @@ FIO_SFUNC void fio___http_controller_http1_send_headers(fio_http_s *h) { /* send data (move memory ownership)? */ c->state.http.buf = buf; return; - fio_io_write2(c->io, - .buf = buf.buf, - .len = buf.len, - .dealloc = FIO_STRING_FREE, - .copy = 0); + // fio_io_write2(c->io, + // .buf = buf.buf, + // .len = buf.len, + // .dealloc = FIO_STRING_FREE, + // .copy = 0); } /** called by the HTTP handle for each body chunk (or to finish a response. */ FIO_SFUNC void fio___http_controller_http1_write_body(