Skip to content

Commit

Permalink
Async timers + force send headers with fio_http_write(.len = 0) + f…
Browse files Browse the repository at this point in the history
…ix TLS with new FIO_IO API
  • Loading branch information
boazsegev committed Dec 2, 2024
1 parent cdee781 commit 47d5ff3
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 58 deletions.
128 changes: 99 additions & 29 deletions fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35015,6 +35015,7 @@ struct fio_io_async_s {
fio_queue_s *q;
uint32_t count;
fio_queue_s queue;
fio_timer_queue_s timers;
FIO_LIST_NODE node;
};

Expand Down Expand Up @@ -35048,6 +35049,30 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads);
/** Pushes a task to an IO Async Queue (macro helper). */
#define fio_io_async(q_, ...) fio_queue_push((q_)->q, __VA_ARGS__)

/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every(fio_io_async_s *q, fio_timer_schedule_args_s);

/**
* Schedules a timer bound task, for the async queue, see `fio_timer_schedule`.
*
* Possible "named arguments" (fio_timer_schedule_args_s members) include:
*
* * The timer function. If it returns a non-zero value, the timer stops:
* int (*fn)(void *, void *)
* * Opaque user data:
* void *udata1
* * Opaque user data:
* void *udata2
* * Called when the timer is done (finished):
* void (*on_stop)(void *, void *)
* * Timer interval, in milliseconds:
* uint32_t every
* * The number of times the timer should be performed. -1 == infinity:
* int32_t repetitions
*/
#define fio_io_async_every(async, ...) \
fio_io_async_every(async, (fio_timer_schedule_args_s){__VA_ARGS__})

/* *****************************************************************************
IO API Finish
***************************************************************************** */
Expand Down Expand Up @@ -35716,7 +35741,8 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
args.dealloc);
} else if ((unsigned)(args.fd + 1) > 1) {
packet = fio_stream_pack_fd((int)args.fd, args.len, args.offset, args.copy);
}
} else /* fio_io_write2 called without data */
goto do_nothing;
if (!packet)
goto error;
if ((io->flags & FIO___IO_FLAG_CLOSE))
Expand All @@ -35725,14 +35751,17 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
return;

error: /* note: `dealloc` already called by the `fio_stream` error handler. */
FIO_LOG_ERROR("couldn't create %zu bytes long user-packet for IO %p (%d)",
args.len,
(void *)io,
(io ? io->fd : -1));
FIO_LOG_ERROR(
"(%d) couldn't create %zu bytes long user-packet for IO %p (%d)",
fio_io_pid(),
args.len,
(void *)io,
(io ? io->fd : -1));
return;

write_called_after_close:
FIO_LOG_DEBUG2("`write` called after `close` was called for IO.");
FIO_LOG_DEBUG2("(%d) `write` called after `close` was called for IO.",
fio_io_pid());
{
union {
void *ptr;
Expand All @@ -35744,6 +35773,7 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {

io_error_null:
FIO_LOG_ERROR("(%d) `fio_write2` called for invalid IO (NULL)", FIO___IO.pid);
do_nothing:
if (args.dealloc) {
union {
void *ptr;
Expand Down Expand Up @@ -36586,6 +36616,7 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
.q = fio_io_queue(),
.count = threads,
.queue = FIO_QUEUE_STATIC_INIT(q->queue),
.timers = FIO_TIMER_QUEUE_INIT,
.node = FIO_LIST_INIT(q->node),
};
FIO_LIST_PUSH(&FIO___IO.async, &q->node);
Expand All @@ -36595,6 +36626,14 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
fio___io_async_start(q);
}

void fio_io_async_every___(void); /* IDE Mark */
/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every FIO_NOOP(fio_io_async_s *q,
fio_timer_schedule_args_s a) {
a.start_at = FIO___IO.tick;
fio_timer_schedule FIO_NOOP(&q->timers, a);
}

/* *****************************************************************************
Managing data after a fork
***************************************************************************** */
Expand Down Expand Up @@ -36692,6 +36731,9 @@ FIO_SFUNC void fio___io_tick(int timeout) {
}
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
fio_timer_push2queue(&FIO___IO.queue, &FIO___IO.timer, FIO___IO.tick);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, a) {
fio_timer_push2queue(a->q, &a->timers, FIO___IO.tick);
}
for (size_t i = 0; i < 2048; ++i)
if (fio_queue_perform(&FIO___IO.queue))
break;
Expand Down Expand Up @@ -37340,7 +37382,7 @@ IO Reactor Finish

Copyright and License: see header file (000 copyright.h) or top of file
***************************************************************************** */
#if defined(H___FIO_SERVER___H) && \
#if defined(H___FIO_IO___H) && \
(HAVE_OPENSSL || __has_include("openssl/ssl.h")) && \
!defined(H___FIO_OPENSSL___H) && !defined(FIO___RECURSIVE_INCLUDE)
#define H___FIO_OPENSSL___H 1
Expand Down Expand Up @@ -37754,7 +37796,8 @@ FIO_LEAK_COUNTER_DEF(fio___SSL)

/** called once the IO was attached and the TLS object was set. */
FIO_SFUNC void fio___openssl_start(fio_io_s *io) {
fio___openssl_context_s *ctx_parent = (fio___openssl_context_s *)fio_tls(io);
fio___openssl_context_s *ctx_parent =
(fio___openssl_context_s *)fio_io_tls(io);
FIO_ASSERT_DEBUG(ctx_parent, "OpenSSL Context missing!");

SSL *ssl = SSL_new(ctx_parent->ctx);
Expand All @@ -37765,7 +37808,7 @@ FIO_SFUNC void fio___openssl_start(fio_io_s *io) {
FIO_LOG_DDEBUG2("(%d) allocated new TLS context for %p.",
(int)fio_thread_getpid(),
(void *)io);
BIO *bio = BIO_new_socket(fio_fd(io), 0);
BIO *bio = BIO_new_socket(fio_io_fd(io), 0);
SSL_set_bio(ssl, bio, bio);
SSL_set_ex_data(ssl, 0, (void *)io);
if (SSL_is_server(ssl))
Expand Down Expand Up @@ -37812,7 +37855,7 @@ static void fio___openssl_free_context_task(void *tls_ctx, void *ignr_) {

/** Builds a local TLS context out of the fio_io_tls_s object. */
static void fio___openssl_free_context(void *tls_ctx) {
fio_srv_defer(fio___openssl_free_context_task, tls_ctx, NULL);
fio_io_defer(fio___openssl_free_context_task, tls_ctx, NULL);
}
/* *****************************************************************************
IO Functions Structure
Expand All @@ -37835,7 +37878,7 @@ SFUNC fio_io_functions_s fio_openssl_io_functions(void) {
FIO_CONSTRUCTOR(fio___openssl_setup_default) {
static fio_io_functions_s FIO___OPENSSL_IO_FUNCS;
FIO___OPENSSL_IO_FUNCS = fio_openssl_io_functions();
fio_io_tls_default_io_functions(&FIO___OPENSSL_IO_FUNCS);
fio_io_tls_default_functions(&FIO___OPENSSL_IO_FUNCS);
#ifdef SIGPIPE
fio_signal_monitor(SIGPIPE, NULL, NULL); /* avoid OpenSSL issue... */
#endif
Expand Down Expand Up @@ -42499,10 +42542,8 @@ FIO_SFUNC int fio____http_write_start(fio_http_s *h,

FIO_SFUNC int fio____http_write_cont(fio_http_s *h,
fio_http_write_args_s *args) {
if (args->buf || args->fd) {
h->controller->write_body(h, *args);
h->sent += args->len;
}
h->controller->write_body(h, *args);
h->sent += args->len;
if (args->finish) {
h->state |= FIO_HTTP_STATE_FINISHED;
h->writer = (h->state & FIO_HTTP_STATE_UPGRADED)
Expand Down Expand Up @@ -46163,13 +46204,15 @@ FIO_SFUNC void fio___http_controller_http1_write_body(
goto no_write_err;
if (fio_http_is_streaming(h))
goto stream_chunk;
if (c->state.http.buf.len && args.buf && args.len) {
fio_string_write(&c->state.http.buf,
FIO_STRING_REALLOC,
(char *)args.buf + args.offset,
args.len);
if (args.dealloc)
args.dealloc((void *)args.buf);
if (c->state.http.buf.len) {
if (args.buf && args.len) {
fio_string_write(&c->state.http.buf,
FIO_STRING_REALLOC,
(char *)args.buf + args.offset,
args.len);
if (args.dealloc)
args.dealloc((void *)args.buf);
}
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
Expand All @@ -46189,14 +46232,40 @@ FIO_SFUNC void fio___http_controller_http1_write_body(

stream_chunk:
if (args.len) { /* print chunk header */
char buf[24];
fio_str_info_s i = FIO_STR_INFO3(buf, 0, 24);
fio_string_write_hex(&i, NULL, args.len);
fio_string_write(&i, NULL, "\r\n", 2);
fio_io_write2(c->io, .buf = (void *)i.buf, .len = i.len, .copy = 1);
if (c->state.http.buf.len) {
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
fio_string_write2(&c->state.http.buf,
FIO_STRING_REALLOC,
FIO_STRING_WRITE_HEX(args.len),
FIO_STRING_WRITE_STR2("\r\n", 2));
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
c->state.http.buf = FIO_STR_INFO0;
} else {
char buf[24];
fio_str_info_s i = FIO_STR_INFO3(buf, 0, 24);
fio_string_write_hex(&i, NULL, args.len);
fio_string_write(&i, NULL, "\r\n", 2);
fio_io_write2(c->io, .buf = (void *)i.buf, .len = i.len, .copy = 1);
}
} else {
FIO_LOG_ERROR("HTTP1 streaming requires a correctly pre-determined "
"length per chunk.");
if (c->state.http.buf.len) {
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
c->state.http.buf = FIO_STR_INFO0;
}
if (args.buf || (uint32_t)(args.fd + 1) > 0U)
FIO_LOG_ERROR("HTTP1 streaming requires a correctly pre-determined "
"length per chunk.");
else
goto no_write_err;
}
fio_io_write2(c->io,
.buf = (void *)args.buf,
Expand All @@ -46211,6 +46280,7 @@ FIO_SFUNC void fio___http_controller_http1_write_body(
fio_io_write2(c->io, .buf = trailer.buf, .len = trailer.len, .copy = 1);
}
return;

no_write_err:
if (args.buf) {
if (args.dealloc)
Expand Down
25 changes: 25 additions & 0 deletions fio-stl/400 io api.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ struct fio_io_async_s {
fio_queue_s *q;
uint32_t count;
fio_queue_s queue;
fio_timer_queue_s timers;
FIO_LIST_NODE node;
};

Expand Down Expand Up @@ -782,6 +783,30 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads);
/** Pushes a task to an IO Async Queue (macro helper). */
#define fio_io_async(q_, ...) fio_queue_push((q_)->q, __VA_ARGS__)

/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every(fio_io_async_s *q, fio_timer_schedule_args_s);

/**
* Schedules a timer bound task, for the async queue, see `fio_timer_schedule`.
*
* Possible "named arguments" (fio_timer_schedule_args_s members) include:
*
* * The timer function. If it returns a non-zero value, the timer stops:
* int (*fn)(void *, void *)
* * Opaque user data:
* void *udata1
* * Opaque user data:
* void *udata2
* * Called when the timer is done (finished):
* void (*on_stop)(void *, void *)
* * Timer interval, in milliseconds:
* uint32_t every
* * The number of times the timer should be performed. -1 == infinity:
* int32_t repetitions
*/
#define fio_io_async_every(async, ...) \
fio_io_async_every(async, (fio_timer_schedule_args_s){__VA_ARGS__})

/* *****************************************************************************
IO API Finish
***************************************************************************** */
Expand Down
26 changes: 20 additions & 6 deletions fio-stl/401 io types.h
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
args.dealloc);
} else if ((unsigned)(args.fd + 1) > 1) {
packet = fio_stream_pack_fd((int)args.fd, args.len, args.offset, args.copy);
}
} else /* fio_io_write2 called without data */
goto do_nothing;
if (!packet)
goto error;
if ((io->flags & FIO___IO_FLAG_CLOSE))
Expand All @@ -671,14 +672,17 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
return;

error: /* note: `dealloc` already called by the `fio_stream` error handler. */
FIO_LOG_ERROR("couldn't create %zu bytes long user-packet for IO %p (%d)",
args.len,
(void *)io,
(io ? io->fd : -1));
FIO_LOG_ERROR(
"(%d) couldn't create %zu bytes long user-packet for IO %p (%d)",
fio_io_pid(),
args.len,
(void *)io,
(io ? io->fd : -1));
return;

write_called_after_close:
FIO_LOG_DEBUG2("`write` called after `close` was called for IO.");
FIO_LOG_DEBUG2("(%d) `write` called after `close` was called for IO.",
fio_io_pid());
{
union {
void *ptr;
Expand All @@ -690,6 +694,7 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {

io_error_null:
FIO_LOG_ERROR("(%d) `fio_write2` called for invalid IO (NULL)", FIO___IO.pid);
do_nothing:
if (args.dealloc) {
union {
void *ptr;
Expand Down Expand Up @@ -1532,6 +1537,7 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
.q = fio_io_queue(),
.count = threads,
.queue = FIO_QUEUE_STATIC_INIT(q->queue),
.timers = FIO_TIMER_QUEUE_INIT,
.node = FIO_LIST_INIT(q->node),
};
FIO_LIST_PUSH(&FIO___IO.async, &q->node);
Expand All @@ -1541,6 +1547,14 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
fio___io_async_start(q);
}

void fio_io_async_every___(void); /* IDE Mark */
/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every FIO_NOOP(fio_io_async_s *q,
fio_timer_schedule_args_s a) {
a.start_at = FIO___IO.tick;
fio_timer_schedule FIO_NOOP(&q->timers, a);
}

/* *****************************************************************************
Managing data after a fork
***************************************************************************** */
Expand Down
3 changes: 3 additions & 0 deletions fio-stl/402 io reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ FIO_SFUNC void fio___io_tick(int timeout) {
}
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
fio_timer_push2queue(&FIO___IO.queue, &FIO___IO.timer, FIO___IO.tick);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, a) {
fio_timer_push2queue(a->q, &a->timers, FIO___IO.tick);
}
for (size_t i = 0; i < 2048; ++i)
if (fio_queue_perform(&FIO___IO.queue))
break;
Expand Down
Loading

0 comments on commit 47d5ff3

Please sign in to comment.