Skip to content

Commit

Permalink
logging + perform IO read events when closed + fix array double destr…
Browse files Browse the repository at this point in the history
…oy + refinements
  • Loading branch information
boazsegev committed Jan 30, 2025
1 parent 645555a commit 666a69d
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 152 deletions.
185 changes: 109 additions & 76 deletions fio-stl.h

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions fio-stl/010 mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -1250,21 +1250,23 @@ FIO_SFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_state_cleanup)(void *ignr_) {
FIO_LIST_NODE node;
};
void *last_chunk = NULL;
FIO_LOG_WARNING("(" FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME,
malloc)) ") blocks left after cleanup - memory leaks?");
FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR(FIO_NAME(
FIO_MEMORY_NAME,
malloc)) ") blocks left after cleanup - memory leaks?",
fio_getpid());
FIO_LIST_EACH(struct t_s,
node,
&FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks,
pos) {
if (last_chunk == (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos))
continue;
last_chunk = (void *)FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(pos);
FIO_LOG_WARNING(
"(" FIO_MACRO2STR(FIO_NAME(FIO_MEMORY_NAME,
malloc)) ") leaked block(s) for chunk %p",
(void *)pos,
last_chunk);
FIO_LOG_WARNING("(%d) (" FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME,
malloc)) ") leaked block(s) for chunk %p",
fio_getpid(),
(void *)pos,
last_chunk);
}
}

Expand Down
7 changes: 4 additions & 3 deletions fio-stl/102 poll epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) {
return total;
int active_count = epoll_wait(p->fds[0].fd, events, FIO_POLL_MAX_EVENTS, 0);
if (active_count > 0) {
/* TODO! fix error handling*/
for (int i = 0; i < active_count; i++) {
// errors are handled as disconnections (on_close) in the EPOLLIN queue
// if no error, try an active event(s)
Expand All @@ -170,12 +171,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout) {
active_count = epoll_wait(p->fds[1].fd, events, FIO_POLL_MAX_EVENTS, 0);
if (active_count > 0) {
for (int i = 0; i < active_count; i++) {
// holds an active event(s)
if (events[i].events & EPOLLIN)
p->settings.on_data(events[i].data.ptr);
// errors are handled as disconnections (on_close), but only once...
if (events[i].events & (~(EPOLLIN | EPOLLOUT)))
p->settings.on_close(events[i].data.ptr);
// no error, then it's an active event(s)
else if (events[i].events & EPOLLIN)
p->settings.on_data(events[i].data.ptr);
} // end for loop
total += active_count;
}
Expand Down
8 changes: 3 additions & 5 deletions fio-stl/102 poll kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,12 @@ SFUNC int fio_poll_review(fio_poll_s *p, size_t timeout_) {
if (active_count > 0) {
for (int i = 0; i < active_count; i++) {
// test for event(s) type
if (events[i].filter == EVFILT_WRITE) {
if ((events[i].filter & EVFILT_WRITE))
p->settings.on_ready(events[i].udata);
} else if (events[i].filter == EVFILT_READ) {
if ((events[i].filter & EVFILT_READ))
p->settings.on_data(events[i].udata);
}
if (events[i].flags & (EV_EOF | EV_ERROR)) {
if (events[i].flags & (EV_EOF | EV_ERROR))
p->settings.on_close(events[i].udata);
}
}
} else if (active_count < 0) {
if (errno == EINTR)
Expand Down
12 changes: 7 additions & 5 deletions fio-stl/201 array.h
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,15 @@ SFUNC void FIO_NAME(FIO_ARRAY_NAME, destroy)(FIO_ARRAY_PTR ary_) {
switch (
FIO_NAME_BL(FIO_ARRAY_NAME, embedded)((FIO_ARRAY_PTR)FIO_PTR_TAG(&tmp))) {
case 0:
if (tmp.a.ary) {
#if !FIO_ARRAY_TYPE_DESTROY_SIMPLE
for (size_t i = tmp.a.start; i < tmp.a.end; ++i) {
FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]);
}
for (size_t i = tmp.a.start; i < tmp.a.end; ++i) {
FIO_ARRAY_TYPE_DESTROY(tmp.a.ary[i]);
}
#endif
FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy));
FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary));
FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_ARRAY_NAME, destroy));
FIO_MEM_FREE_(tmp.a.ary, tmp.a.capa * sizeof(*tmp.a.ary));
}
return;
case 1:
#if !FIO_ARRAY_TYPE_DESTROY_SIMPLE
Expand Down
5 changes: 4 additions & 1 deletion fio-stl/400 io api.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ developer.

#ifndef FIO_IO_SHUTDOWN_TIMEOUT
/* Sets the hard timeout (in milliseconds) for the reactor's shutdown loop. */
#define FIO_IO_SHUTDOWN_TIMEOUT 10000
#define FIO_IO_SHUTDOWN_TIMEOUT 15000
#endif

#ifndef FIO_IO_COUNT_STORAGE
Expand Down Expand Up @@ -387,6 +387,9 @@ SFUNC int fio_io_is_open(fio_io_s *io);
/** Returns the approximate number of bytes in the outgoing buffer. */
SFUNC size_t fio_io_backlog(fio_io_s *io);

/** Does nothing. */
SFUNC void fio_io_noop(fio_io_s *io);

/* *****************************************************************************
Task Scheduling
***************************************************************************** */
Expand Down
58 changes: 35 additions & 23 deletions fio-stl/401 io types.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,13 @@ FIO_IFUNC void fio___io_env_safe_destroy(fio___io_env_safe_s *e) {
Protocol Type Initialization
***************************************************************************** */

static void fio___io_on_ev_mock_sus(fio_io_s *io) { fio_io_suspend(io); }
static void fio___io_on_ev_mock_unsus(fio_io_s *io) { fio_io_unsuspend(io); }
static void fio___io_on_ev_mock(fio_io_s *io) { (void)(io); }
SFUNC void fio_io_noop(fio_io_s *io) { (void)(io); }

static void fio___io_on_ev_pubsub_mock(struct fio_msg_s *msg) { (void)(msg); }
static void fio___io_on_user_mock(fio_io_s *io, void *i_) {
(void)io, (void)i_;
}
static void fio___io_on_close_mock(void *p1, void *p2) { (void)p1, (void)p2; }
static void fio___io_on_ev_on_timeout(fio_io_s *io) { fio_io_close_now(io); }

/* Called to perform a non-blocking `read`, same as the system call. */
static ssize_t fio___io_func_default_read(int fd,
Expand Down Expand Up @@ -199,7 +197,7 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) {
fio_io_functions_s io_fn = {
.build_context = fio___io_func_default_build_context,
.free_context = fio___io_func_default_free_context,
.start = fio___io_on_ev_mock,
.start = fio_io_noop,
.read = fio___io_func_default_read,
.write = fio___io_func_default_write,
.flush = fio___io_func_default_flush,
Expand All @@ -209,17 +207,17 @@ FIO_SFUNC void fio___io_init_protocol(fio_io_protocol_s *pr, _Bool has_tls) {
if (has_tls)
io_fn = fio_io_tls_default_functions(NULL);
if (!pr->on_attach)
pr->on_attach = fio___io_on_ev_mock;
pr->on_attach = fio_io_noop;
if (!pr->on_data)
pr->on_data = fio___io_on_ev_mock_sus;
pr->on_data = fio_io_suspend;
if (!pr->on_ready)
pr->on_ready = fio___io_on_ev_mock;
pr->on_ready = fio_io_noop;
if (!pr->on_close)
pr->on_close = fio___io_on_close_mock;
if (!pr->on_shutdown)
pr->on_shutdown = fio___io_on_ev_mock_unsus;
pr->on_shutdown = fio_io_noop;
if (!pr->on_timeout)
pr->on_timeout = fio___io_on_ev_on_timeout;
pr->on_timeout = fio_io_close_now;
if (!pr->on_pubsub)
pr->on_pubsub = fio___io_on_ev_pubsub_mock;
if (!pr->on_user1)
Expand Down Expand Up @@ -364,14 +362,15 @@ IO Type
#define FIO___IO_FLAG_CLOSE ((uint32_t)8U)
#define FIO___IO_FLAG_CLOSE_REMOTE ((uint32_t)16U)
#define FIO___IO_FLAG_CLOSE_ERROR ((uint32_t)32U)
#define FIO___IO_FLAG_TOUCH ((uint32_t)64U)
#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U)
#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U)
#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U)
#define FIO___IO_FLAG_CLOSED_ALL \
(FIO___IO_FLAG_CLOSE | FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR)
#define FIO___IO_FLAG_TOUCH ((uint32_t)64U)
#define FIO___IO_FLAG_WRITE_SCHD ((uint32_t)128U)
#define FIO___IO_FLAG_POLLIN_SET ((uint32_t)256U)
#define FIO___IO_FLAG_POLLOUT_SET ((uint32_t)512U)

#define FIO___IO_FLAG_PREVENT_ON_DATA \
(FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED | FIO___IO_FLAG_CLOSE | \
FIO___IO_FLAG_CLOSE_REMOTE | FIO___IO_FLAG_CLOSE_ERROR)
(FIO___IO_FLAG_SUSPENDED | FIO___IO_FLAG_THROTTLED)

#define FIO___IO_FLAG_POLL_SET \
(FIO___IO_FLAG_POLLIN_SET | FIO___IO_FLAG_POLLOUT_SET)
Expand Down Expand Up @@ -406,7 +405,7 @@ FIO_IFUNC void fio___io_monitor_in(fio_io_s *io) {
FIO_LOG_DDEBUG2("(%d) IO monitoring Input for %d (called)",
fio_io_pid(),
io->fd);
if (io->flags & FIO___IO_FLAG_PREVENT_ON_DATA)
if (io->flags & (FIO___IO_FLAG_PREVENT_ON_DATA | FIO___IO_FLAG_CLOSED_ALL))
return;
if ((FIO___IO_FLAG_SET(io, FIO___IO_FLAG_POLLIN_SET) &
FIO___IO_FLAG_POLLIN_SET)) {
Expand Down Expand Up @@ -526,6 +525,7 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd,
void *udata,
void *tls) {
fio_io_s *io = NULL;
fio_io_protocol_s cpy;
if (fd == -1)
goto error;
io = fio___io_new2(pr->buffer_size);
Expand All @@ -548,8 +548,9 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd,
return io;

error:
pr->on_close(NULL, udata);
pr->io_functions.cleanup(tls);
cpy = *pr;
cpy.on_close(NULL, udata);
cpy.io_functions.cleanup(tls);
return io;
}

Expand Down Expand Up @@ -657,6 +658,9 @@ FIO_SFUNC void fio___io_write2(void *io_, void *packet_) {
return;

io_closed:
FIO_LOG_DEBUG2("(%d) write task to closed IO %d failed (task too late).",
fio_io_pid(),
fio_io_fd(io));
fio_stream_packet_free(packet);
fio___io_free2(io);
}
Expand Down Expand Up @@ -850,7 +854,7 @@ static void fio___io_poll_on_data(void *io_, void *ignr_) {
fio_io_s *io = (fio_io_s *)io_;
FIO___IO_FLAG_UNSET(io, FIO___IO_FLAG_POLLIN_SET);
if (!(io->flags & FIO___IO_FLAG_PREVENT_ON_DATA)) {
/* this also tests for the suspended / throttled / closing flags */
/* this also tests for the suspended / throttled flags, allows closed */
io->pr->on_data(io);
fio___io_monitor_in(io);
} else if ((io->flags & FIO___IO_FLAG_OPEN)) {
Expand Down Expand Up @@ -950,13 +954,21 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
fio___io_free2(io);
}

// static void fio___io_poll_on_close_task(void *io_, void *ignr_) {
// (void)ignr_;
// fio_io_s *io = (fio_io_s *)io_;
// fio_io_close_now(io);
// fio___io_free2(io);
// }

static void fio___io_poll_on_close(void *io_, void *ignr_) {
(void)ignr_;
fio_io_s *io = (fio_io_s *)io_;
if (!(io->flags & FIO___IO_FLAG_CLOSE)) {
FIO___IO_FLAG_SET(io, FIO___IO_FLAG_CLOSE_REMOTE);
FIO_LOG_DEBUG2("(%d) fd %d closed by remote peer", FIO___IO.pid, io->fd);
}
/* allow on_data tasks to complete before closing? */
fio_io_close_now(io);
fio___io_free2(io);
}
Expand Down Expand Up @@ -1336,7 +1348,7 @@ SFUNC fio_io_tls_s *fio_io_tls_alpn_add(fio_io_tls_s *t,
if (!t || !protocol_name)
return t;
if (!on_selected)
on_selected = fio___io_on_ev_mock;
on_selected = fio_io_noop;
size_t pr_name_len = strlen(protocol_name);
if (pr_name_len > 255) {
FIO_LOG_ERROR(
Expand Down Expand Up @@ -1486,7 +1498,7 @@ SFUNC int fio_io_tls_each FIO_NOOP(fio_io_tls_each_s a) {
SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) {
static fio_io_functions_s default_io_functions = {
.build_context = fio___io_func_default_build_context,
.start = fio___io_on_ev_mock,
.start = fio_io_noop,
.read = fio___io_func_default_read,
.write = fio___io_func_default_write,
.flush = fio___io_func_default_flush,
Expand All @@ -1498,7 +1510,7 @@ SFUNC fio_io_functions_s fio_io_tls_default_functions(fio_io_functions_s *f) {
if (!f->build_context)
f->build_context = fio___io_func_default_build_context;
if (!f->start)
f->start = fio___io_on_ev_mock;
f->start = fio_io_noop;
if (!f->read)
f->read = fio___io_func_default_read;
if (!f->write)
Expand Down
52 changes: 29 additions & 23 deletions fio-stl/402 io reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ The IO Reactor Cycle (the actual work)
***************************************************************************** */

static void fio___io_signal_stop(int sig, void *flg) {
FIO_LOG_INFO("(%d) stop signal detected.", FIO___IO.pid);
fio_io_stop();
(void)sig, (void)flg;
}

static void fio___io_signal_restart(int sig, void *flg) {
if (fio_io_is_master())
fio_io_restart(FIO___IO.workers);
else
else {
FIO_LOG_INFO("(%d) restart signal detected.", FIO___IO.pid);
fio_io_stop();
}
(void)sig, (void)flg;
}

Expand Down Expand Up @@ -208,7 +211,9 @@ static void *fio___io_worker_sentinel(void *pid_data) {
FIO___LOCK_UNLOCK(FIO___IO.lock);

if (!WIFEXITED(status) || WEXITSTATUS(status)) {
FIO_LOG_WARNING("(%d) abnormal worker exit detected", FIO___IO.pid);
FIO_LOG_WARNING("(%d) abnormal worker exit detected for %d",
FIO___IO.pid,
sentinal.pid);
fio_state_callback_force(FIO_CALL_ON_CHILD_CRUSH);
}
if (!FIO___IO.stop && !sentinal.stop) {
Expand Down Expand Up @@ -240,26 +245,10 @@ static void fio___io_spawn_worker(void) {
if (FIO___IO.stop || !fio_io_is_master())
return;

/* do not allow master tasks to run in worker - pretend to stop. */
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2)
return;
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
fio___io_async_stop(q);
}
fio_queue_perform_all(&FIO___IO.queue);
/* perform forking procedure with the stop flag reset. */
fio_atomic_and_fetch(&FIO___IO.stop, 1);
fio_state_callback_force(FIO_CALL_BEFORE_FORK);
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
/* perform actual fork */
fio_thread_pid_t pid = fio_thread_fork();
FIO_ASSERT(pid != (fio_thread_pid_t)-1, "system call `fork` failed.");
if (!pid)
goto is_worker_process;
/* finish up */
fio_state_callback_force(FIO_CALL_AFTER_FORK);
fio_state_callback_force(FIO_CALL_IN_MASTER);
if (fio_thread_create(&t, fio___io_worker_sentinel, (void *)(uintptr_t)pid)) {
FIO_LOG_FATAL(
"sentinel thread creation failed, no worker will be spawned.");
Expand Down Expand Up @@ -309,10 +298,29 @@ static void fio___io_spawn_workers_task(void *ignr_1, void *ignr_2) {
if (fio_atomic_or(&is_running, 1))
return;
FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_pid(), FIO___IO.to_spawn);

/* do not allow master tasks to run in worker - pretend to stop. */
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
if (fio_atomic_or_fetch(&FIO___IO.stop, 2) != 2)
return;
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
fio___io_async_stop(q);
}
fio_queue_perform_all(&FIO___IO.queue);

/* perform forking procedure with the stop flag reset. */
fio_atomic_and_fetch(&FIO___IO.stop, 1);
fio_state_callback_force(FIO_CALL_BEFORE_FORK);
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();

/* perform actual fork */
do {
fio___io_spawn_worker();
} while (fio_atomic_sub_fetch(&FIO___IO.to_spawn, 1));

/* finish up */
fio_state_callback_force(FIO_CALL_AFTER_FORK);
fio_state_callback_force(FIO_CALL_IN_MASTER);
if ((FIO___IO.flags & FIO___IO_FLAG_CYCLING)) {
fio___io_defer_no_wakeup(fio___io_work_task, NULL, NULL);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
Expand Down Expand Up @@ -360,10 +368,8 @@ SFUNC void fio_io_start(int workers) {
#endif
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
if (workers) {
FIO_LOG_INFO("(%d) spawning %d workers.", fio_io_root_pid(), workers);
for (int i = 0; i < workers; ++i) {
fio___io_spawn_worker();
}
FIO___IO.to_spawn = workers;
fio___io_spawn_workers_task(NULL, NULL);
} else {
FIO_LOG_DEBUG2("(%d) starting facil.io IO reactor in single process mode.",
fio_io_root_pid());
Expand Down Expand Up @@ -427,7 +433,7 @@ SFUNC void fio___io_restart(void *workers_, void *ignr_) {
FIO___LOCK_LOCK(FIO___IO.lock);
FIO_LIST_EACH(fio___io_pid_s, node, &FIO___IO.pids, w) {
w->stop = 1;
fio_thread_kill(w->pid, SIGTERM);
fio_thread_kill(w->pid, SIGUSR1);
}
FIO___LOCK_UNLOCK(FIO___IO.lock);
/* switch to single mode? */
Expand Down
Loading

0 comments on commit 666a69d

Please sign in to comment.