From 3098a06900545070bc88bfb419ddd21c37580db2 Mon Sep 17 00:00:00 2001 From: "L. Pereira" Date: Thu, 5 Sep 2024 06:56:25 -0700 Subject: [PATCH] WIP: mariadb-async --- src/samples/techempower/database.c | 127 +++++++++++++++++++------- src/samples/techempower/database.h | 13 ++- src/samples/techempower/techempower.c | 42 +++++---- 3 files changed, 127 insertions(+), 55 deletions(-) diff --git a/src/samples/techempower/database.c b/src/samples/techempower/database.c index c48b0a895..ae9fffb42 100644 --- a/src/samples/techempower/database.c +++ b/src/samples/techempower/database.c @@ -19,28 +19,36 @@ */ #include -#include #include #include +#include #include #include -#include +#include #include "database.h" #include "lwan-status.h" +/* Including "lwan.h" introduces namespace conflicts (linked-list + * functions from CCAN and MySQL headers.) */ +struct lwan_request; +int lwan_request_await_read(struct lwan_request *r, int fd); +int lwan_request_await_write(struct lwan_request *r, int fd); +int lwan_request_await_read_write(struct lwan_request *r, int fd); + struct db_stmt { - bool (*bind)(const struct db_stmt *stmt, - struct db_row *rows); + bool (*bind)(const struct db_stmt *stmt, struct db_row *rows); bool (*step)(const struct db_stmt *stmt, va_list ap); void (*finalize)(struct db_stmt *stmt); const char *param_signature; const char *result_signature; + void *ctx; }; struct db { void (*disconnect)(struct db *db); struct db_stmt *(*prepare)(const struct db *db, + void *ctx, const char *sql, const char *param_signature, const char *result_signature); @@ -58,13 +66,13 @@ struct db_stmt_mysql { MYSQL_STMT *stmt; MYSQL_BIND *param_bind; MYSQL_BIND *result_bind; + int db_fd; bool must_execute_again; bool results_are_bound; MYSQL_BIND param_result_bind[]; }; -static bool db_stmt_bind_mysql(const struct db_stmt *stmt, - struct db_row *rows) +static bool db_stmt_bind_mysql(const struct db_stmt *stmt, struct db_row *rows) { struct db_stmt_mysql *stmt_mysql = (struct db_stmt_mysql *)stmt; const char *signature = stmt->param_signature; @@ -95,16 +103,55 @@ static bool db_stmt_bind_mysql(const struct db_stmt *stmt, return !mysql_stmt_bind_param(stmt_mysql->stmt, stmt_mysql->param_bind); } -static bool db_stmt_step_mysql(const struct db_stmt *stmt, - va_list ap) +static bool db_stmt_step_mysql(const struct db_stmt *stmt, va_list ap) { struct db_stmt_mysql *stmt_mysql = (struct db_stmt_mysql *)stmt; if (stmt_mysql->must_execute_again) { stmt_mysql->must_execute_again = false; stmt_mysql->results_are_bound = false; - if (mysql_stmt_execute(stmt_mysql->stmt)) - return false; + + int status; + + mysql_stmt_execute_start(&status, stmt_mysql->stmt); + while (status) { + /* FIXME: Handle MYSQL_WAIT_TIMEOUT and MYSQL_WAIT_EXCEPT + * properly. + * + * TIMEOUT is handled by calling mysql_get_timeout_value() + * and passing it to the await functions; that's not currently + * supported in Lwan, at least not easily (one could set a + * timeout by hand, similar to how lwan_request_sleep() does, + * but instead of yielding with CONN_CORO_SUSPEND, one would + * forward the arguments to the actual called await function; + * upon return, one would check what caused the coroutine to + * be resumed, and return -ETIMEDOUT or something. + * + * EXCEPT is handled like EPOLLPRI. Currently unsupported + * in Lwan too, although it's easier to implement than timeout. + * + * For now, ignore both flags. */ + status &= ~(MYSQL_WAIT_TIMEOUT | MYSQL_WAIT_EXCEPT); + + switch (status) { + case MYSQL_WAIT_READ: + lwan_request_await_read(stmt_mysql->base.ctx, + stmt_mysql->db_fd); + break; + case MYSQL_WAIT_WRITE: + lwan_request_await_write(stmt_mysql->base.ctx, + stmt_mysql->db_fd); + break; + case MYSQL_WAIT_READ | MYSQL_WAIT_WRITE: + /* FIXME: how do we know what will cause the coroutine + * to resume, a read or a write? */ + lwan_request_await_read_write(stmt_mysql->base.ctx, + stmt_mysql->db_fd); + break; + } + + status = mysql_stmt_execute_cont(&status, stmt_mysql->stmt, status); + } } if (!stmt_mysql->results_are_bound) { @@ -154,15 +201,16 @@ static void db_stmt_finalize_mysql(struct db_stmt *stmt) free(stmt_mysql); } -static struct db_stmt * -db_prepare_mysql(const struct db *db, - const char *sql, - const char *param_signature, - const char *result_signature) +static struct db_stmt *db_prepare_mysql(const struct db *db, + void *ctx, + const char *sql, + const char *param_signature, + const char *result_signature) { const struct db_mysql *db_mysql = (const struct db_mysql *)db; const size_t n_bounds = strlen(param_signature) + strlen(result_signature); - struct db_stmt_mysql *stmt_mysql = malloc(sizeof(*stmt_mysql) + n_bounds * sizeof(MYSQL_BIND)); + struct db_stmt_mysql *stmt_mysql = + malloc(sizeof(*stmt_mysql) + n_bounds * sizeof(MYSQL_BIND)); if (!stmt_mysql) return NULL; @@ -175,19 +223,24 @@ db_prepare_mysql(const struct db *db, goto out_close_stmt; assert(strlen(param_signature) == mysql_stmt_param_count(stmt_mysql->stmt)); - assert(strlen(result_signature) == mysql_stmt_field_count(stmt_mysql->stmt)); + assert(strlen(result_signature) == + mysql_stmt_field_count(stmt_mysql->stmt)); stmt_mysql->base.bind = db_stmt_bind_mysql; stmt_mysql->base.step = db_stmt_step_mysql; stmt_mysql->base.finalize = db_stmt_finalize_mysql; stmt_mysql->param_bind = &stmt_mysql->param_result_bind[0]; - stmt_mysql->result_bind = &stmt_mysql->param_result_bind[strlen(param_signature)]; + stmt_mysql->result_bind = + &stmt_mysql->param_result_bind[strlen(param_signature)]; stmt_mysql->must_execute_again = true; stmt_mysql->results_are_bound = false; stmt_mysql->base.param_signature = param_signature; stmt_mysql->base.result_signature = result_signature; + stmt_mysql->db_fd = mysql_get_socket(db_mysql->con); + stmt_mysql->base.ctx = ctx; + memset(stmt_mysql->param_result_bind, 0, n_bounds * sizeof(MYSQL_BIND)); return (struct db_stmt *)stmt_mysql; @@ -231,6 +284,11 @@ struct db *db_connect_mysql(const char *host, if (mysql_set_character_set(db_mysql->con, "utf8")) goto error; + if (mysql_optionsv(db_mysql->con, MYSQL_OPT_NONBLOCK, 0)) { + lwan_status_error("Could not enable non-blocking mode"); + goto error; + } + db_mysql->base.disconnect = db_disconnect_mysql; db_mysql->base.prepare = db_prepare_mysql; @@ -254,8 +312,7 @@ struct db_stmt_sqlite { sqlite3_stmt *sqlite; }; -static bool db_stmt_bind_sqlite(const struct db_stmt *stmt, - struct db_row *rows) +static bool db_stmt_bind_sqlite(const struct db_stmt *stmt, struct db_row *rows) { const struct db_stmt_sqlite *stmt_sqlite = (const struct db_stmt_sqlite *)stmt; @@ -270,8 +327,8 @@ static bool db_stmt_bind_sqlite(const struct db_stmt *stmt, switch (signature[row]) { case 's': - ret = sqlite3_bind_text(stmt_sqlite->sqlite, (int)row + 1, r->u.s, -1, - NULL); + ret = sqlite3_bind_text(stmt_sqlite->sqlite, (int)row + 1, r->u.s, + -1, NULL); break; case 'i': ret = sqlite3_bind_int(stmt_sqlite->sqlite, (int)row + 1, r->u.i); @@ -287,8 +344,7 @@ static bool db_stmt_bind_sqlite(const struct db_stmt *stmt, return true; } -static bool db_stmt_step_sqlite(const struct db_stmt *stmt, - va_list ap) +static bool db_stmt_step_sqlite(const struct db_stmt *stmt, va_list ap) { const struct db_stmt_sqlite *stmt_sqlite = (const struct db_stmt_sqlite *)stmt; @@ -328,11 +384,11 @@ static void db_stmt_finalize_sqlite(struct db_stmt *stmt) free(stmt_sqlite); } -static struct db_stmt * -db_prepare_sqlite(const struct db *db, - const char *sql, - const char *param_signature, - const char *result_signature) +static struct db_stmt *db_prepare_sqlite(const struct db *db, + void *ctx, + const char *sql, + const char *param_signature, + const char *result_signature) { const struct db_sqlite *db_sqlite = (const struct db_sqlite *)db; struct db_stmt_sqlite *stmt_sqlite = malloc(sizeof(*stmt_sqlite)); @@ -354,6 +410,8 @@ db_prepare_sqlite(const struct db *db, stmt_sqlite->base.param_signature = param_signature; stmt_sqlite->base.result_signature = result_signature; + stmt_sqlite->base.ctx = ctx; + return (struct db_stmt *)stmt_sqlite; } @@ -414,10 +472,11 @@ inline void db_stmt_finalize(struct db_stmt *stmt) { stmt->finalize(stmt); } inline void db_disconnect(struct db *db) { db->disconnect(db); } -inline struct db_stmt *db_prepare_stmt(const struct db *db, - const char *sql, - const char *param_signature, - const char *result_signature) +inline struct db_stmt *db_prepare_stmt_ctx(const struct db *db, + void *ctx, + const char *sql, + const char *param_signature, + const char *result_signature) { - return db->prepare(db, sql, param_signature, result_signature); + return db->prepare(db, ctx, sql, param_signature, result_signature); } diff --git a/src/samples/techempower/database.h b/src/samples/techempower/database.h index 323cdcbf4..cc48b55f5 100644 --- a/src/samples/techempower/database.h +++ b/src/samples/techempower/database.h @@ -34,10 +34,21 @@ struct db_row { }; -struct db_stmt *db_prepare_stmt(const struct db *db, +struct db_stmt *db_prepare_stmt_ctx(const struct db *db, + void *ctx, const char *sql, const char *param_signature, const char *result_signature); + +static inline struct db_stmt *db_prepare_stmt(const struct db *db, + const char *sql, + const char *param_signature, + const char *result_signature) +{ + return db_prepare_stmt_ctx(db, NULL, sql, param_signature, + result_signature); +} + void db_stmt_finalize(struct db_stmt *stmt); bool db_stmt_bind(const struct db_stmt *stmt, struct db_row *rows); diff --git a/src/samples/techempower/techempower.c b/src/samples/techempower/techempower.c index f94223625..5a3a48f39 100644 --- a/src/samples/techempower/techempower.c +++ b/src/samples/techempower/techempower.c @@ -22,12 +22,12 @@ #include #include -#include "lwan-private.h" +#include "int-to-str.h" #include "lwan-cache.h" #include "lwan-config.h" -#include "lwan-template.h" #include "lwan-mod-lua.h" -#include "int-to-str.h" +#include "lwan-private.h" +#include "lwan-template.h" #include "database.h" #include "json.h" @@ -63,6 +63,8 @@ struct Fortune { int id; char *message; } item; + + struct lwan_request *request; }; DEFINE_ARRAY_TYPE_INLINEFIRST(fortune_array, struct Fortune) @@ -219,7 +221,8 @@ static inline bool db_query(struct db_stmt *stmt, struct db_json *out) LWAN_HANDLER(db) { - struct db_stmt *stmt = db_prepare_stmt(get_db(), random_number_query, "i", "ii"); + struct db_stmt *stmt = + db_prepare_stmt_ctx(get_db(), request, random_number_query, "i", "ii"); struct db_json db_json; if (UNLIKELY(!stmt)) { @@ -237,7 +240,7 @@ LWAN_HANDLER(db) request->flags |= RESPONSE_NO_EXPIRES; return json_response_obj(response, db_json_desc, N_ELEMENTS(db_json_desc), - &db_json); + &db_json); } static long get_number_of_queries(struct lwan_request *request) @@ -253,7 +256,8 @@ LWAN_HANDLER(queries) enum lwan_http_status ret = HTTP_INTERNAL_ERROR; long queries = get_number_of_queries(request); - struct db_stmt *stmt = db_prepare_stmt(get_db(), random_number_query, "i", "ii"); + struct db_stmt *stmt = + db_prepare_stmt_ctx(get_db(), request, random_number_query, "i", "ii"); if (UNLIKELY(!stmt)) return HTTP_INTERNAL_ERROR; @@ -283,10 +287,8 @@ struct db_json_cached { struct db_json db_json; }; -static struct cache_entry *cached_queries_new(const void *keyptr, - void *context, - void *create_ctx - __attribute__((unused))) +static struct cache_entry * +cached_queries_new(const void *keyptr, void *context, void *create_ctx) { struct db_json_cached *entry; struct db_stmt *stmt; @@ -296,7 +298,8 @@ static struct cache_entry *cached_queries_new(const void *keyptr, if (UNLIKELY(!entry)) return NULL; - stmt = db_prepare_stmt(get_db(), cached_random_number_query, "i", "ii"); + stmt = db_prepare_stmt_ctx(get_db(), create_ctx, cached_random_number_query, + "i", "ii"); if (UNLIKELY(!stmt)) { free(entry); return NULL; @@ -327,8 +330,8 @@ LWAN_HANDLER(cached_queries) int key = (int)lwan_random_uint64() % 10000; int error; - jc = (struct db_json_cached *)cache_get_and_ref_entry( - cached_queries_cache, (void *)(intptr_t)key, &error); + jc = (struct db_json_cached *)cache_get_and_ref_entry_with_ctx( + cached_queries_cache, (void *)(intptr_t)key, request, &error); qj.queries[i] = jc->db_json; @@ -392,7 +395,8 @@ static int fortune_list_generator(struct coro *coro, void *data) struct fortune_array fortunes; struct db_stmt *stmt; - stmt = db_prepare_stmt(get_db(), fortune_query, "", "is"); + stmt = db_prepare_stmt_ctx(get_db(), fortune->request, fortune_query, "", + "is"); if (UNLIKELY(!stmt)) return 0; @@ -426,7 +430,7 @@ static int fortune_list_generator(struct coro *coro, void *data) LWAN_HANDLER(fortunes) { - struct Fortune fortune; + struct Fortune fortune = {.request = request}; lwan_strbuf_grow_to(response->buffer, 1500); @@ -484,11 +488,9 @@ int main(void) if (!fortune_tpl) lwan_status_critical("Could not compile fortune templates"); - cached_queries_cache = cache_create_full(cached_queries_new, - cached_queries_free, - hash_int_new, - NULL, - 3600 /* 1 hour */); + cached_queries_cache = + cache_create_full(cached_queries_new, cached_queries_free, hash_int_new, + NULL, 3600 /* 1 hour */); if (!cached_queries_cache) lwan_status_critical("Could not create cached queries cache"); /* Pre-populate the cache and make it read-only to avoid locking in the fast