Skip to content

Commit

Permalink
logging + hot worker restart + doc
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Jan 28, 2025
1 parent ca9690a commit 645555a
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 92 deletions.
128 changes: 82 additions & 46 deletions fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11582,7 +11582,7 @@ static const char *FIO___STATE_TASKS_NAMES[FIO_CALL_NEVER + 1] = {
[FIO_CALL_ON_PARENT_CRUSH] = "ON_PARENT_CRUSH",
[FIO_CALL_ON_CHILD_CRUSH] = "ON_CHILD_CRUSH",
[FIO_CALL_ON_WORKER_THREAD_END] = "ON_WORKER_THREAD_END",
[FIO_CALL_ON_STOP] = "ON_FINISH",
[FIO_CALL_ON_STOP] = "ON_STOP",
[FIO_CALL_AT_EXIT] = "AT_EXIT",
[FIO_CALL_NEVER] = "NEVER",
};
Expand Down Expand Up @@ -11677,10 +11677,10 @@ SFUNC void fio_state_callback_force(fio_state_event_type_e e) {
fio_trylock(FIO___STATE_TASKS_ARRAY_LOCK + FIO_CALL_NEVER);
}

FIO_LOG_DDEBUG2("(%d) scheduling %s callbacks (%zu tasks).",
(int)(fio_thread_getpid()),
FIO___STATE_TASKS_NAMES[e],
(size_t)FIO___STATE_TASKS_ARRAY[e].count);
FIO_LOG_DEBUG2("(%d) scheduling %s callbacks (%zu tasks).",
(int)(fio_thread_getpid()),
FIO___STATE_TASKS_NAMES[e],
(size_t)FIO___STATE_TASKS_ARRAY[e].count);
if (!FIO___STATE_TASKS_ARRAY[e].count)
return;
/* copy task queue */
Expand All @@ -11696,11 +11696,8 @@ SFUNC void fio_state_callback_force(fio_state_event_type_e e) {
}
fio_unlock(FIO___STATE_TASKS_ARRAY_LOCK + (uintptr_t)e);

if (e <= FIO_CALL_PRE_START) {
/* perform copied tasks immediately within system thread */
for (size_t i = 0; i < len; ++i)
ary[i].func(ary[i].arg);
} else if (e <= FIO_CALL_ON_IDLE) {
/* perform copied tasks in correct order */
if (e <= FIO_CALL_ON_IDLE) {
/* perform tasks in order */
for (size_t i = 0; i < len; ++i)
ary[i].func(ary[i].arg);
Expand All @@ -11711,6 +11708,7 @@ SFUNC void fio_state_callback_force(fio_state_event_type_e e) {
}
/* cleanup */
FIO_MEM_FREE(ary, ary_capa);

(void)FIO___STATE_TASKS_NAMES; /* if unused */
}

Expand Down Expand Up @@ -14196,46 +14194,59 @@ static volatile size_t FIO_NAME(FIO_MEMORY_NAME, __malloc_total);
#define FIO_MEMORY_ON_CHUNK_ALLOC(ptr) \
do { \
FIO_LEAK_COUNTER_ON_ALLOC(FIO_NAME(FIO_MEMORY_NAME, __malloc_chunk)); \
FIO_LOG_DEBUG2("MEMORY CHUNK-ALLOC allocated %p", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY CHUNK-ALLOC allocated %p", \
fio_getpid(), \
ptr); \
} while (0);
#define FIO_MEMORY_ON_CHUNK_FREE(ptr) \
do { \
FIO_LEAK_COUNTER_ON_FREE(FIO_NAME(FIO_MEMORY_NAME, __malloc_chunk)); \
FIO_LOG_DEBUG2("MEMORY CHUNK-DEALLOC de-allocated %p", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY CHUNK-DEALLOC de-allocated %p", \
fio_getpid(), \
ptr); \
} while (0);
#define FIO_MEMORY_ON_CHUNK_CACHE(ptr) \
do { \
FIO_LOG_DEBUG2("MEMORY CACHE-PUSH placed %p in cache", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY CACHE-PUSH placed %p in cache", \
fio_getpid(), \
ptr); \
} while (0);
#define FIO_MEMORY_ON_CHUNK_UNCACHE(ptr) \
do { \
FIO_LOG_DEBUG2("MEMORY CACHE-POP retrieved %p from cache", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY CACHE-POP retrieved %p from cache", \
fio_getpid(), \
ptr); \
} while (0);

#define FIO_MEMORY_ON_BLOCK_RESET_IN_LOCK(ptr, blk) \
do { \
if (0) \
FIO_LOG_DEBUG2("MEMORY chunk %p block %zu reset in lock", \
FIO_LOG_DEBUG2("(%d) MEMORY chunk %p block %zu reset in lock", \
ptr, \
(size_t)blk); \
} while (0);

#define FIO_MEMORY_ON_BIG_BLOCK_SET(ptr) \
do { \
if (1) \
FIO_LOG_DEBUG2("MEMORY chunk %p used as big-block", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY chunk %p used as big-block", \
fio_getpid(), \
ptr); \
} while (0);

#define FIO_MEMORY_ON_BIG_BLOCK_UNSET(ptr) \
do { \
if (1) \
FIO_LOG_DEBUG2("MEMORY chunk %p no longer used as big-block", ptr); \
FIO_LOG_DEBUG2("(%d) MEMORY chunk %p no longer used as big-block", \
fio_getpid(), \
ptr); \
} while (0);
#define FIO_MEMORY_PRINT_STATS_END() \
do { \
FIO_LOG_DEBUG2( \
"(" FIO_MACRO2STR( \
"(%d) (" FIO_MACRO2STR( \
FIO_NAME(FIO_MEMORY_NAME, malloc)) ") total allocations: %zu", \
fio_getpid(), \
FIO_NAME(FIO_MEMORY_NAME, __malloc_total)); \
} while (0)
#define FIO_MEMORY_ON_ALLOC_FUNC() \
Expand Down Expand Up @@ -14754,8 +14765,9 @@ SFUNC void FIO_NAME(FIO_MEMORY_NAME, malloc_after_fork)(void) {
FIO_NAME(FIO_MEMORY_NAME, __mem_state_setup)();
return;
}
FIO_LOG_DEBUG2("MEMORY reinitializing " FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME, malloc)) " state");
FIO_LOG_DEBUG2("(%d) MEMORY reinitializing " FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME, malloc)) " state",
fio_getpid());
FIO_MEMORY_LOCK_TYPE_INIT(FIO_NAME(FIO_MEMORY_NAME, __mem_state)->lock);
#if FIO_MEMORY_ENABLE_BIG_ALLOC
FIO_MEMORY_LOCK_TYPE_INIT(FIO_NAME(FIO_MEMORY_NAME, __mem_state)->big_lock);
Expand All @@ -14775,9 +14787,10 @@ Memory Allocation - state printing (debug helper)
void fio_malloc_print_state___(void);
/** Prints the allocator's data structure. May be used for debugging. */
SFUNC void FIO_NAME(FIO_MEMORY_NAME, malloc_print_state)(void) {
fprintf(
stderr,
FIO_MACRO2STR(FIO_NAME(FIO_MEMORY_NAME, malloc)) " allocator state:\n");
fprintf(stderr,
"(%d) " FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME, malloc)) " allocator state:\n",
fio_getpid());
for (size_t i = 0; i < FIO_NAME(FIO_MEMORY_NAME, __mem_state)->arena_count;
++i) {
fprintf(stderr,
Expand Down Expand Up @@ -14836,9 +14849,11 @@ SFUNC void FIO_NAME(FIO_MEMORY_NAME, malloc_print_free_block_list)(void) {
if (FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks.prev ==
&FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks)
return;
fprintf(stderr,
FIO_MACRO2STR(FIO_NAME(FIO_MEMORY_NAME,
malloc)) " allocator free block list:\n");
fprintf(
stderr,
"(%d) " FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME, malloc)) " allocator free block list:\n",
fio_getpid());
FIO_LIST_NODE *n = FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks.prev;
for (size_t i = 0; n != &FIO_NAME(FIO_MEMORY_NAME, __mem_state)->blocks;
++i) {
Expand Down Expand Up @@ -14984,11 +14999,13 @@ FIO_IFUNC void FIO_NAME(FIO_MEMORY_NAME, __mem_block_free)(void *p) {
return;
FIO_ASSERT_DEBUG(
(uint32_t)c->blocks[b].ref <= FIO_MEMORY_UNITS_PER_BLOCK + 1,
"block reference count corrupted, possible double free? (%zd)",
"(%d) block reference count corrupted, possible double free? (%zd)",
fio_getpid(),
(size_t)c->blocks[b].ref);
FIO_ASSERT_DEBUG(
(uint32_t)c->blocks[b].pos <= FIO_MEMORY_UNITS_PER_BLOCK + 1,
"block allocation position corrupted, possible double free? (%zd)",
"(%d) block allocation position corrupted, possible double free? (%zd)",
fio_getpid(),
(size_t)c->blocks[b].pos);
if (fio_atomic_sub_fetch(&c->blocks[b].ref, 1))
return;
Expand Down Expand Up @@ -15414,8 +15431,9 @@ FIO_IFUNC void *FIO_MEM_ALIGN_NEW FIO_NAME(FIO_MEMORY_NAME,
{
#ifdef DEBUG
FIO_LOG_WARNING(
"unintended " FIO_MACRO2STR(
"(%d) unintended " FIO_MACRO2STR(
FIO_NAME(FIO_MEMORY_NAME, mmap)) " allocation (slow): %zu bytes",
fio_getpid(),
FIO_MEM_BYTES2PAGES(size));
#endif
p = FIO_NAME(FIO_MEMORY_NAME, mmap)(size);
Expand Down Expand Up @@ -15587,7 +15605,9 @@ SFUNC void *FIO_MEM_ALIGN FIO_NAME(FIO_MEMORY_NAME, realloc2)(void *ptr,
FIO_NAME(FIO_MEMORY_NAME, __mem_chunk_s) *c =
FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2chunk)(ptr);
size_t b = FIO_NAME(FIO_MEMORY_NAME, __mem_ptr2index)(c, ptr);
FIO_ASSERT(c, "cannot reallocate a pointer with a NULL system allocation");
FIO_ASSERT(c,
"(%d) cannot reallocate a pointer with a NULL system allocation",
fio_getpid());

register size_t max_len =
((uintptr_t)FIO_NAME(FIO_MEMORY_NAME, __mem_chunk2ptr)(c, b, 0) +
Expand Down Expand Up @@ -35475,7 +35495,8 @@ IO Reactor State Machine
typedef struct {
FIO_LIST_NODE node;
fio_thread_pid_t pid;
volatile size_t stop;
volatile unsigned done;
volatile unsigned stop;
} fio___io_pid_s;

static struct FIO___IO_S {
Expand Down Expand Up @@ -36156,8 +36177,10 @@ static void fio___io_poll_on_ready(void *io_, void *ignr_) {
static void fio___io_poll_on_close(void *io_, void *ignr_) {
(void)ignr_;
fio_io_s *io = (fio_io_s *)io_;
FIO___IO_FLAG_SET(io, FIO___IO_FLAG_CLOSE_REMOTE);
FIO_LOG_DEBUG2("(%d) fd %d closed by remote peer", FIO___IO.pid, io->fd);
if (!(io->flags & FIO___IO_FLAG_CLOSE)) {
FIO___IO_FLAG_SET(io, FIO___IO_FLAG_CLOSE_REMOTE);
FIO_LOG_DEBUG2("(%d) fd %d closed by remote peer", FIO___IO.pid, io->fd);
}
fio_io_close_now(io);
fio___io_free2(io);
}
Expand Down Expand Up @@ -36969,7 +36992,7 @@ FIO_SFUNC void fio___io_work_task(void *ignr_1, void *ignr_2) {
fio_queue_push(&FIO___IO.queue, fio___io_work_task, ignr_1, ignr_2);
return;
no_run:
FIO___IO_FLAG_UNSET(&FIO___IO, FIO___IO_FLAG_CYCLING);
return;
}

FIO_SFUNC void fio___io_work(int is_worker) {
Expand All @@ -36983,17 +37006,25 @@ FIO_SFUNC void fio___io_work(int is_worker) {
fio_state_callback_force(FIO_CALL_ON_START);
}
fio___io_wakeup_init();
FIO___IO_FLAG_SET(&FIO___IO, FIO___IO_FLAG_CYCLING);

fio_queue_push(&FIO___IO.queue, fio___io_work_task);
FIO___IO_FLAG_SET(&FIO___IO, FIO___IO_FLAG_CYCLING);
fio_queue_perform_all(&FIO___IO.queue);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
fio___io_async_stop(q);
}
FIO___IO_FLAG_UNSET(&FIO___IO, FIO___IO_FLAG_CYCLING);

fio___io_shutdown();

FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
fio___io_async_stop(q);
}
/* signal all child workers to terminate, parent is going away. */
FIO___LOCK_LOCK(FIO___IO.lock);
if (FIO___IO.pids.next)
FIO_LIST_EACH(fio___io_pid_s, node, &FIO___IO.pids, pos) {
if (!pos->done)
fio_thread_kill(pos->pid, SIGTERM);
}
FIO___LOCK_UNLOCK(FIO___IO.lock);

fio_queue_perform_all(&FIO___IO.queue);
fio_state_callback_force(FIO_CALL_ON_STOP);
Expand Down Expand Up @@ -37030,8 +37061,10 @@ static void *fio___io_worker_sentinel(void *pid_data) {

if (fio_thread_waitpid(sentinal.pid, &status, 0) != sentinal.pid &&
!FIO___IO.stop)
FIO_LOG_ERROR("waitpid failed, worker re-spawning might fail.");

FIO_LOG_ERROR("(%d) waitpid failed for %d, worker re-spawning might fail.",
fio_thread_getpid(),
sentinal.pid);
sentinal.done = 1;
FIO___LOCK_LOCK(FIO___IO.lock);
FIO_LIST_REMOVE(&sentinal.node);
FIO___LOCK_UNLOCK(FIO___IO.lock);
Expand Down Expand Up @@ -37098,6 +37131,8 @@ static void fio___io_spawn_worker(void) {

is_worker_process:
FIO___IO.pid = fio_thread_getpid();
FIO___IO.is_worker = 1;

/* close all inherited connections immediately? */
FIO_LIST_EACH(fio_io_protocol_s,
reserved.protocols,
Expand All @@ -37110,7 +37145,6 @@ static void fio___io_spawn_worker(void) {
fio_queue_perform_all(&FIO___IO.queue);
/* TODO: keep? */

FIO___IO.is_worker = 1;
FIO_LOG_INFO("(%d) worker starting up.", fio_io_pid());

if (FIO___IO.stop)
Expand All @@ -37120,9 +37154,12 @@ static void fio___io_spawn_worker(void) {
fio_state_callback_force(FIO_CALL_IN_CHILD);
fio_queue_perform_all(&FIO___IO.queue);
fio___io_work(1);
skip_work:
FIO_LOG_INFO("(%d) worker exiting.", fio_io_pid());
exit(0);
skip_work:
FIO_LOG_WARNING("(%d) worker exiting - stop signal detected during restart.",
fio_io_pid());
exit(0);
}

static void fio___io_spawn_workers_task(void *ignr_1, void *ignr_2) {
Expand All @@ -37138,8 +37175,7 @@ static void fio___io_spawn_workers_task(void *ignr_1, void *ignr_2) {
fio___io_spawn_worker();
} while (fio_atomic_sub_fetch(&FIO___IO.to_spawn, 1));

if (!(FIO___IO_FLAG_SET(&FIO___IO, FIO___IO_FLAG_CYCLING) &
FIO___IO_FLAG_CYCLING)) {
if ((FIO___IO.flags & FIO___IO_FLAG_CYCLING)) {
fio___io_defer_no_wakeup(fio___io_work_task, NULL, NULL);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, q) {
fio___io_async_start(q);
Expand Down Expand Up @@ -37330,12 +37366,12 @@ static void fio___io_listen_free(void *l_) {

if (l->hide_from_log)
FIO_LOG_DEBUG2("(%d) stopped listening @ %.*s",
getpid(),
fio_thread_getpid(),
(int)l->url_len,
l->url);
else
FIO_LOG_INFO("(%d) stopped listening @ %.*s",
getpid(),
fio_thread_getpid(),
(int)l->url_len,
l->url);
fio_queue_perform_all(&FIO___IO.queue);
Expand Down
12 changes: 12 additions & 0 deletions fio-stl.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,23 @@ void my_type_free(my_type_s * t) {
/* count deallocation before freeing object - tests excessive calls to free) */
FIO_LEAK_COUNTER_ON_FREE(my_type_s);
free(t);
FIO_LOG_DEBUG("We now have only %zu my_type_s objects left.",
FIO_LEAK_COUNTER_COUNT(my_type_s));
}
```

**Note**: the `FIO_REF` reference counting module does this automatically when `FIO_LEAK_COUNTER` is defined as true.

#### `FIO_LEAK_COUNTER_COUNT`

```c
#define FIO_LEAK_COUNTER_COUNT(name)
```

Returns the number of unfreed allocattions according to the named memory leak detector.

Returned type is `size_t`

-------------------------------------------------------------------------------

## Dedicated Static Memory Allocations
Expand Down
12 changes: 12 additions & 0 deletions fio-stl/000 core.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,23 @@ void my_type_free(my_type_s * t) {
/* count deallocation before freeing object - tests excessive calls to free) */
FIO_LEAK_COUNTER_ON_FREE(my_type_s);
free(t);
FIO_LOG_DEBUG("We now have only %zu my_type_s objects left.",
FIO_LEAK_COUNTER_COUNT(my_type_s));
}
```
**Note**: the `FIO_REF` reference counting module does this automatically when `FIO_LEAK_COUNTER` is defined as true.
#### `FIO_LEAK_COUNTER_COUNT`
```c
#define FIO_LEAK_COUNTER_COUNT(name)
```

Returns the number of unfreed allocattions according to the named memory leak detector.

Returned type is `size_t`

-------------------------------------------------------------------------------

## Dedicated Static Memory Allocations
Expand Down
Loading

0 comments on commit 645555a

Please sign in to comment.