diff --git a/examples/chat.c b/examples/chat.c index 4c6f6af..8369761 100644 --- a/examples/chat.c +++ b/examples/chat.c @@ -25,21 +25,21 @@ Callbacks and object used by main() ***************************************************************************** */ /** Called when a new connection is created and login process starts. */ -FIO_SFUNC void on_login_start(fio_s *io); +FIO_SFUNC void on_login_start(fio_io_s *io); /** Called there's incoming data (from STDIN / the client socket. */ -FIO_SFUNC void on_data_login(fio_s *io); -FIO_SFUNC void on_data_chat(fio_s *io); +FIO_SFUNC void on_data_login(fio_io_s *io); +FIO_SFUNC void on_data_chat(fio_io_s *io); /** Called when a login process should be performed. */ -FIO_SFUNC void on_shutdown(fio_s *io); +FIO_SFUNC void on_shutdown(fio_io_s *io); /** Called when the monitored IO is closed or has a fatal error. */ -FIO_SFUNC void on_close(void *udata); +FIO_SFUNC void on_close(void *buf, void *udata); -static fio_protocol_s CHAT_PROTOCOL_LOGIN = { +static fio_io_protocol_s CHAT_PROTOCOL_LOGIN = { .on_attach = on_login_start, .on_data = on_data_login, .on_close = on_close, }; -static fio_protocol_s CHAT_PROTOCOL_CHAT = { +static fio_io_protocol_s CHAT_PROTOCOL_CHAT = { .on_data = on_data_chat, .on_close = on_close, .on_shutdown = on_shutdown, @@ -70,8 +70,8 @@ FIO_IFUNC void client_free(client_s *c) { } /** Called when a new connection is created and login process starts. */ -FIO_SFUNC void on_login_start(fio_s *io) { - fio_udata_set(io, client_new()); +FIO_SFUNC void on_login_start(fio_io_s *io) { + fio_io_udata_set(io, client_new()); FIO_STR_INFO_TMP_VAR(node_msg, 1024); fio_string_write2( &node_msg, @@ -80,13 +80,13 @@ FIO_SFUNC void on_login_start(fio_s *io) { FIO_STRING_WRITE_UNUM(getpid()), FIO_STRING_WRITE_STR1( "\nPlease enter a login handle (up to 30 characters long)\n")); - fio_write(io, node_msg.buf, node_msg.len); + fio_io_write(io, node_msg.buf, node_msg.len); if (fio_cli_get_bool("-v")) FIO_LOG_INFO("(%d) %p connected", getpid(), (void *)io); } /** Called when the monitored IO is closed or has a fatal error. */ -FIO_SFUNC void on_close(void *udata) { +FIO_SFUNC void on_close(void *buf, void *udata) { FIO_STR_INFO_TMP_VAR(s, CHAT_MAX_HANDLE_LEN + 32); client_s *c = udata; fio_string_write2(&s, @@ -95,10 +95,11 @@ FIO_SFUNC void on_close(void *udata) { FIO_STRING_WRITE_STR1(" left the chat.\n")); fio_publish(.message = FIO_STR2BUF_INFO(s)); client_free(c); + (void)buf; } /** Performs "login" logic (saves user handle) */ -FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) { +FIO_SFUNC void on_data_first_line(fio_io_s *io, char *name, size_t len) { if (!len) goto error_name_too_short; do @@ -110,7 +111,7 @@ FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) { goto error_name_too_short; if (len > 30) goto error_name_too_long; - client_s *c = fio_udata(io); + client_s *c = fio_io_udata(io); memcpy(c->name, name, len); c->name[len] = 0; c->name[31] = (char)len; @@ -122,23 +123,23 @@ FIO_SFUNC void on_data_first_line(fio_s *io, char *name, size_t len) { FIO_STRING_WRITE_STR1("You are connected to node: "), FIO_STRING_WRITE_UNUM(getpid()), FIO_STRING_WRITE_STR1("\n")); - fio_write2(io, - .buf = welcome, - .len = fio_bstr_len(welcome), - .dealloc = (void (*)(void *))fio_bstr_free); + fio_io_write2(io, + .buf = welcome, + .len = fio_bstr_len(welcome), + .dealloc = (void (*)(void *))fio_bstr_free); return; error_name_too_long: - fio_write(io, "ERROR! login handle too long. Goodbye.\n", 39); - fio_close(io); + fio_io_write(io, "ERROR! login handle too long. Goodbye.\n", 39); + fio_io_close(io); return; error_name_too_short: - fio_write(io, "ERROR! login handle too short (empty?). Goodbye.\n", 49); - fio_close(io); + fio_io_write(io, "ERROR! login handle too short (empty?). Goodbye.\n", 49); + fio_io_close(io); } /** Manages chat messages */ -FIO_SFUNC void on_data_message_line(fio_s *io, char *msg, size_t len) { - client_s *c = fio_udata(io); +FIO_SFUNC void on_data_message_line(fio_io_s *io, char *msg, size_t len) { + client_s *c = fio_io_udata(io); char *buf = fio_bstr_write2(NULL, FIO_STRING_WRITE_STR2(c->name, c->name[31]), FIO_STRING_WRITE_STR2(": ", 2), @@ -153,27 +154,27 @@ FIO_SFUNC void on_data_message_line(fio_s *io, char *msg, size_t len) { ((msg[2] | 32) == 'o') & ((msg[3] | 32) == 'd') & ((msg[4] | 32) == 'b') & ((msg[5] | 32) == 'y') & ((msg[6] | 32) == 'e')))) { - fio_write(io, "Goodbye.\n", 9); - fio_close(io); + fio_io_write(io, "Goodbye.\n", 9); + fio_io_close(io); } } -FIO_IFUNC int on_data_read(fio_s *io) { +FIO_IFUNC int on_data_read(fio_io_s *io) { char buf[CHAT_MAX_MESSAGE_LEN]; - size_t r = fio_read(io, buf, CHAT_MAX_MESSAGE_LEN); + size_t r = fio_io_read(io, buf, CHAT_MAX_MESSAGE_LEN); if (!r) return -1; - client_s *c = fio_udata(io); + client_s *c = fio_io_udata(io); fio_stream_add(&c->input, fio_stream_pack_data(buf, r, 0, 1, NULL)); return 0; } -FIO_IFUNC int on_data_process_line(fio_s *io, - void(task)(fio_s *, char *, size_t)) { +FIO_IFUNC int on_data_process_line(fio_io_s *io, + void(task)(fio_io_s *, char *, size_t)) { char tmp[CHAT_MAX_MESSAGE_LEN]; char *buf = tmp; size_t len = CHAT_MAX_MESSAGE_LEN; - client_s *c = fio_udata(io); + client_s *c = fio_io_udata(io); fio_stream_read(&c->input, &buf, &len); if (!len) return -1; @@ -187,18 +188,18 @@ FIO_IFUNC int on_data_process_line(fio_s *io, } /** for the first input line of the Chat protocol. */ -FIO_SFUNC void on_data_login(fio_s *io) { +FIO_SFUNC void on_data_login(fio_io_s *io) { if (on_data_read(io)) return; if (on_data_process_line(io, on_data_first_line)) return; - fio_protocol_set(io, &CHAT_PROTOCOL_CHAT); + fio_io_protocol_set(io, &CHAT_PROTOCOL_CHAT); fio_subscribe(.io = io); on_data_chat(io); } /** for each subsequent message / line in the Chat protocol. */ -FIO_SFUNC void on_data_chat(fio_s *io) { +FIO_SFUNC void on_data_chat(fio_io_s *io) { if (on_data_read(io)) return; while (!on_data_process_line(io, on_data_message_line)) @@ -206,8 +207,8 @@ FIO_SFUNC void on_data_chat(fio_s *io) { } /** Called when a login process should be performed. */ -FIO_SFUNC void on_shutdown(fio_s *io) { - fio_write(io, "Server shutting down, goodbye...\n", 33); +FIO_SFUNC void on_shutdown(fio_io_s *io) { + fio_io_write(io, "Server shutting down, goodbye...\n", 33); } /* ***************************************************************************** @@ -248,14 +249,14 @@ int main(int argc, char const *argv[]) { fio_subscribe(.on_message = print_chat, .master_only = 1); /* review CLI connection address (in URL format) */ - FIO_ASSERT(fio_srv_listen(.url = fio_cli_unnamed(0), - .protocol = &CHAT_PROTOCOL_LOGIN), + FIO_ASSERT(fio_io_listen(.url = fio_cli_unnamed(0), + .protocol = &CHAT_PROTOCOL_LOGIN), "Could not open listening socket as requested."); FIO_LOG_INFO("\n\tStarting plain text Chat server example app." "\n\tEngine: " FIO_POLL_ENGINE_STR "\n\tWorkers: %d" "\n\tPress ^C to exit.", - fio_srv_workers(fio_cli_get_i("-w"))); - fio_srv_start(fio_cli_get_i("-w")); + fio_io_workers(fio_cli_get_i("-w"))); + fio_io_start(fio_cli_get_i("-w")); FIO_LOG_INFO("Shutdown complete."); fio_cli_end(); return 0; diff --git a/examples/client.c b/examples/client.c index cab28bb..01ebe1b 100644 --- a/examples/client.c +++ b/examples/client.c @@ -199,7 +199,7 @@ FIO_SFUNC void on_input_closed(void *buf, void *udata) { } /* Debug messages for STDIN round-trip */ -void debug_subscriber(fio_msg_s *msg) { +FIO_SFUNC void debug_subscriber(fio_msg_s *msg) { FIO_LOG_DEBUG2("Subscriber received: %.*s", msg->message.len, msg->message.buf); diff --git a/fio-stl.h b/fio-stl.h index cbc7aab..babf604 100644 --- a/fio-stl.h +++ b/fio-stl.h @@ -22346,7 +22346,7 @@ ChaCha20Poly1305 API * data, producing a 16 byte message authentication code (MAC) using Poly1305. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `ad` MAY be omitted, will NOT be encrypted. * * `data` MAY be omitted, WILL be encrypted. * * `mac` MUST point to a buffer with (at least) 16 available bytes. @@ -22357,14 +22357,14 @@ SFUNC void fio_chacha20_poly1305_enc(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce); + const void *nonce); /** * Performs an in-place decryption of `data` using ChaCha20 after authenticating * the message authentication code (MAC) using Poly1305. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `ad` MAY be omitted ONLY IF originally omitted. * * `data` MAY be omitted, WILL be decrypted. * * `mac` MUST point to a buffer where the 16 byte MAC is placed. @@ -22377,7 +22377,7 @@ SFUNC int fio_chacha20_poly1305_dec(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce); + const void *nonce); /* ***************************************************************************** Using ChaCha20 and Poly1305 separately @@ -22387,13 +22387,13 @@ Using ChaCha20 and Poly1305 separately * Performs an in-place encryption/decryption of `data` using ChaCha20. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `counter` is the block counter, usually 1 unless `data` is mid-cyphertext. */ SFUNC void fio_chacha20(void *restrict data, size_t len, const void *key, - const void *nounce, + const void *nonce, uint32_t counter); /** @@ -22666,7 +22666,7 @@ ChaCha20 (encryption) } FIO_IFUNC fio_u512 fio___chacha_init(const void *key, - const void *nounce, + const void *nonce, uint32_t counter) { fio_u512 o = { .u32 = @@ -22682,9 +22682,9 @@ FIO_IFUNC fio_u512 fio___chacha_init(const void *key, fio_buf2u32_le((uint8_t *)key + 24), fio_buf2u32_le((uint8_t *)key + 28), counter, - fio_buf2u32_le(nounce), - fio_buf2u32_le((uint8_t *)nounce + 4), - fio_buf2u32_le((uint8_t *)nounce + 8), + fio_buf2u32_le(nonce), + fio_buf2u32_le((uint8_t *)nonce + 4), + fio_buf2u32_le((uint8_t *)nonce + 8), }, // clang-format on }; return o; @@ -22785,9 +22785,9 @@ FIO_SFUNC void fio___chacha_vround20x2(fio_u512 c, uint8_t *restrict data) { SFUNC void fio_chacha20(void *restrict data, size_t len, const void *key, - const void *nounce, + const void *nonce, uint32_t counter) { - fio_u512 c = fio___chacha_init(key, nounce, counter); + fio_u512 c = fio___chacha_init(key, nonce, counter); for (size_t pos = 127; pos < len; pos += 128) { fio___chacha_vround20x2(c, (uint8_t *)data); c.u32[12] += 2; /* block counter */ @@ -22821,8 +22821,8 @@ SFUNC void fio_chacha20_poly1305_enc(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { - fio_u512 c = fio___chacha_init(key, nounce, 0); + const void *nonce) { + fio_u512 c = fio___chacha_init(key, nonce, 0); fio___poly_s pl; { fio_u512 c2 = fio___chacha20_mixround(c); @@ -22902,10 +22902,10 @@ SFUNC void fio_chacha20_poly1305_auth(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { + const void *nonce) { fio___poly_s pl; { - fio_u512 c = fio___chacha_init(key, nounce, 0); + fio_u512 c = fio___chacha_init(key, nonce, 0); c = fio___chacha20_mixround(c); /* computes poly1305 key */ pl = fio___poly_init(&c); } @@ -22944,13 +22944,13 @@ SFUNC int fio_chacha20_poly1305_dec(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { + const void *nonce) { uint64_t auth[2]; - fio_chacha20_poly1305_auth(&auth, data, len, ad, adlen, key, nounce); + fio_chacha20_poly1305_auth(&auth, data, len, ad, adlen, key, nonce); if (((auth[0] ^ fio_buf2u64u(mac)) | (auth[1] ^ fio_buf2u64u(((char *)mac + 8))))) return -1; - fio_chacha20(data, len, key, nounce, 1); + fio_chacha20(data, len, key, nonce, 1); return 0; } /* ***************************************************************************** @@ -31931,7 +31931,7 @@ IFUNC FIO_REF_TYPE_PTR FIO_NAME(FIO_REF_NAME, FIO_REF_CONSTRUCTOR)(void) { FIO_LEAK_COUNTER_ON_ALLOC(FIO_REF_NAME); o->ref = 1; #ifdef FIO_REF_FLEX_TYPE - o->flx_size = members; + o->flx_size = (uint32_t)members; #endif FIO_REF_METADATA_INIT((o->metadata)); FIO_REF_TYPE *ret = (FIO_REF_TYPE *)(o + 1); @@ -35319,7 +35319,7 @@ IO Reactor State Machine #define FIO___IO_FLAG_WAKEUP (1U) -SFUNC struct { +SFUNC struct FIO___IO { fio_poll_s poll; int64_t tick; fio_queue_s queue; @@ -35573,10 +35573,11 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, .active = FIO___IO.tick, }; fio_sock_set_non_block(fd); - FIO_LOG_DDEBUG2("(%d) attaching fd %d to IO object %p", + FIO_LOG_DDEBUG2("(%d) attaching fd %d to IO object %p (%zu bytes buffer)", fio_io_pid(), fd, - (void *)io); + (void *)io, + fio_io_buffer_len(io)); fio_io_defer(fio___io_protocol_set, (void *)fio___io_dup2(io), (void *)pr); return io; @@ -38317,11 +38318,8 @@ Message wire format (as 64 bit numerals in little endien encoding): #undef FIO___PUBSUB_MESSAGE_HEADER #define FIO___PUBSUB_MESSAGE_HEADER 24 /* header + 2 NUL bytes (message + channel) + 16 byte MAC */ -#undef FIO___PUBSUB_MESSAGE_OVERHEAD_NET -#define FIO___PUBSUB_MESSAGE_OVERHEAD_NET (FIO___PUBSUB_MESSAGE_HEADER + 18) -/* extra 2 NUL bytes (after message & channel name) */ #undef FIO___PUBSUB_MESSAGE_OVERHEAD -#define FIO___PUBSUB_MESSAGE_OVERHEAD (FIO___PUBSUB_MESSAGE_OVERHEAD_NET + 2) +#define FIO___PUBSUB_MESSAGE_OVERHEAD (FIO___PUBSUB_MESSAGE_HEADER + 18) /* ***************************************************************************** Pub/Sub - defaults and builtin pub/sub engines @@ -38436,28 +38434,24 @@ typedef struct { size_t len; uint64_t uuid[2]; fio___pubsub_message_s *msg; - char buf[FIO___PUBSUB_MESSAGE_OVERHEAD_NET]; + char buf[]; } fio___pubsub_message_parser_s; FIO_LEAK_COUNTER_DEF(fio___pubsub_message_parser_s) FIO_IFUNC fio___pubsub_message_parser_s *fio___pubsub_message_parser( fio_io_s *io) { - return (fio___pubsub_message_parser_s *)fio_io_buffer(io); + return io ? (fio___pubsub_message_parser_s *)fio_io_buffer(io) : NULL; } FIO_SFUNC void fio___pubsub_message_parser_init( fio___pubsub_message_parser_s *p) { FIO_LEAK_COUNTER_ON_ALLOC(fio___pubsub_message_parser_s); - p->len = 0; - p->uuid[0] = p->uuid[1] = 0; - p->msg = NULL; + *p = (fio___pubsub_message_parser_s){0}; } FIO_SFUNC void fio___pubsub_message_parser_destroy( fio___pubsub_message_parser_s *p) { - if (!p) - return; fio___pubsub_message_free(p->msg); FIO_LEAK_COUNTER_ON_FREE(fio___pubsub_message_parser_s); } @@ -38660,7 +38654,8 @@ static struct FIO___PUBSUB_POSTOFFICE { .on_data = fio___pubsub_protocol_on_data_master, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, - .buffer_size = sizeof(fio___pubsub_message_parser_s), + .buffer_size = sizeof(fio___pubsub_message_parser_s) + + FIO___PUBSUB_MESSAGE_OVERHEAD, }, .remote = { @@ -38668,7 +38663,8 @@ static struct FIO___PUBSUB_POSTOFFICE { .on_data = fio___pubsub_protocol_on_data_remote, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio___pubsub_protocol_on_timeout, - .buffer_size = sizeof(fio___pubsub_message_parser_s), + .buffer_size = sizeof(fio___pubsub_message_parser_s) + + FIO___PUBSUB_MESSAGE_OVERHEAD, }, }, }; @@ -38904,12 +38900,16 @@ FIO_CONSTRUCTOR(fio_postoffice_init) { .on_data = fio___pubsub_protocol_on_data_master, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, + .buffer_size = + sizeof(fio___pubsub_message_parser_s) + FIO___PUBSUB_MESSAGE_OVERHEAD, }; FIO___PUBSUB_POSTOFFICE.protocol.remote = (fio_io_protocol_s){ .on_attach = fio___pubsub_protocol_on_attach, .on_data = fio___pubsub_protocol_on_data_remote, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, + .buffer_size = + sizeof(fio___pubsub_message_parser_s) + FIO___PUBSUB_MESSAGE_OVERHEAD, }; } @@ -39296,12 +39296,15 @@ FIO_IFUNC fio___pubsub_message_s *fio___pubsub_message_alloc(void *header) { const size_t channel_len = fio_buf2u16_le((char *)header + 18); const size_t message_len = fio_buf2u24_le((char *)header + 20); m = fio___pubsub_message_new(((channel_len + message_len) << 1) + - FIO___PUBSUB_MESSAGE_OVERHEAD); + (FIO___PUBSUB_MESSAGE_OVERHEAD + 2)); FIO_ASSERT_ALLOC(m); - m->data = (fio_msg_s){ - .udata = m->buf + channel_len + message_len + 2, - .channel = FIO_BUF_INFO2(m->buf, channel_len), - .message = FIO_BUF_INFO2(m->buf + channel_len + 1, message_len), + *m = (fio___pubsub_message_s){ + .data = + (fio_msg_s){ + .udata = m->buf + channel_len + message_len + 2, + .channel = FIO_BUF_INFO2(m->buf, channel_len), + .message = FIO_BUF_INFO2(m->buf + channel_len + 1, message_len), + }, }; return m; } @@ -39310,22 +39313,27 @@ FIO_IFUNC fio___pubsub_message_s *fio___pubsub_message_author( fio_publish_args_s args) { fio___pubsub_message_s *m = fio___pubsub_message_new(((args.message.len + args.channel.len) << 1) + - FIO___PUBSUB_MESSAGE_OVERHEAD); + (FIO___PUBSUB_MESSAGE_OVERHEAD + 2)); FIO_ASSERT_ALLOC(m); - m->data = (fio_msg_s){ - .io = args.from, - .id = args.id ? args.id : fio_rand64(), - .published = args.published ? args.published - : (uint64_t)fio_time2milli(fio_time_real()), - .channel = FIO_BUF_INFO2(m->buf, args.channel.len), - .message = FIO_BUF_INFO2(m->buf + args.channel.len + 1, args.message.len), - .filter = args.filter, - .is_json = args.is_json, + *m = (fio___pubsub_message_s){ + .data = + (fio_msg_s){ + .io = args.from, + .id = args.id ? args.id : fio_rand64(), + .published = args.published + ? args.published + : (uint64_t)fio_time2milli(fio_time_real()), + .channel = FIO_BUF_INFO2(m->buf, args.channel.len), + .message = FIO_BUF_INFO2(m->buf + args.channel.len + 1, + args.message.len), + .filter = args.filter, + .is_json = args.is_json, + }, }; - if (args.channel.len) + if (args.channel.buf && args.channel.len) FIO_MEMCPY(m->data.channel.buf, args.channel.buf, args.channel.len); m->data.channel.buf[args.channel.len] = 0; - if (args.message.buf) + if (args.message.buf && args.message.len) FIO_MEMCPY(m->data.message.buf, args.message.buf, args.message.len); m->data.message.buf[args.message.len] = 0; return m; @@ -39337,6 +39345,7 @@ FIO_SFUNC void fio___pubsub_message_encrypt(fio___pubsub_message_s *m) { const void *k = fio___pubsub_secret_key(m->data.id); const uint64_t nonce[2] = {fio_risky_num(m->data.id, 0), m->data.published}; uint8_t *pos = (uint8_t *)(m->data.message.buf + m->data.message.len + 1); + uint8_t *dest = pos; m->data.udata = (void *)pos; fio_u2buf64_le(pos, m->data.id); pos += 8; @@ -39354,14 +39363,13 @@ FIO_SFUNC void fio___pubsub_message_encrypt(fio___pubsub_message_s *m) { if (enc_len == 2) return; pos += enc_len; - fio_chacha20_poly1305_enc( - pos, - (void *)((char *)(m->data.udata) + FIO___PUBSUB_MESSAGE_HEADER), - m->data.channel.len + m->data.message.len + 2, - m->data.udata, - FIO___PUBSUB_MESSAGE_HEADER, - k, - nonce); + fio_chacha20_poly1305_enc(pos, + (void *)(dest + FIO___PUBSUB_MESSAGE_HEADER), + m->data.channel.len + m->data.message.len + 2, + m->data.udata, + FIO___PUBSUB_MESSAGE_HEADER, + k, + nonce); } FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { @@ -39369,7 +39377,8 @@ FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { return 0; if (!m->data.udata) return -1; - uint8_t *pos = (uint8_t *)(m->data.udata); + uint8_t *pos = (uint8_t *)(m->data.message.buf + m->data.message.len + 1); + uint8_t *const dest = pos; m->data.id = fio_buf2u64_le(pos); pos += 8; m->data.published = fio_buf2u64_le(pos); @@ -39393,7 +39402,7 @@ FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { m->buf, m->data.channel.len + m->data.message.len + 2, - m->data.udata, + dest, FIO___PUBSUB_MESSAGE_HEADER, k, nonce); @@ -39411,12 +39420,15 @@ FIO_IFUNC void fio___pubsub_message_write2io(fio_io_s *io, void *m_) { fio___pubsub_message_s *m = (fio___pubsub_message_s *)m_; if (io == m->data.io) return; - FIO_LOG_DDEBUG2("(%d) pub/sub sending IPC/peer message.", fio_io_pid()); + FIO_LOG_DDEBUG2("(%d) pub/sub sending IPC/peer message: %zu bytes", + fio_io_pid(), + m->data.message.len + m->data.channel.len + + FIO___PUBSUB_MESSAGE_OVERHEAD); fio___pubsub_message_encrypt(m); fio_io_write2(io, .buf = fio___pubsub_message_dup(m), .len = (m->data.message.len + m->data.channel.len + - FIO___PUBSUB_MESSAGE_OVERHEAD_NET), + FIO___PUBSUB_MESSAGE_OVERHEAD), .offset = ((uintptr_t)(m->data.udata) - (uintptr_t)(m)), .dealloc = (void (*)(void *))fio___pubsub_message_free); } @@ -39447,11 +39459,6 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { if (flags & FIO___PUBSUB_SPECIAL) goto is_special_message; - if ((FIO___PUBSUB_POSTOFFICE.filter.publish & flags)) - fio_queue_push(fio_io_queue(), - fio___pubsub_message_deliver_task, - fio___pubsub_message_dup(m)); - if ((FIO___PUBSUB_POSTOFFICE.filter.local & flags)) fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, fio___pubsub_message_write2io, @@ -39461,6 +39468,12 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.remote, fio___pubsub_message_write2io, m); + + if ((FIO___PUBSUB_POSTOFFICE.filter.publish & flags)) + fio_queue_push(fio_io_queue(), + fio___pubsub_message_deliver_task, + fio___pubsub_message_dup(m)); + return; is_special_message: @@ -39493,7 +39506,8 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { p->uuid[0], p->uuid[1]); } - FIO_LOG_INFO("(cluster) identified new peer (%zu connections)", + FIO_LOG_INFO("(%d - cluster) identified new peer (%zu connections)", + fio_io_pid(), fio___pubsub_broadcast_connected_count( &FIO___PUBSUB_POSTOFFICE.remote_uuids)); return; @@ -39570,7 +39584,7 @@ void fio_publish FIO_NOOP(fio_publish_args_s args) { m = fio___pubsub_message_author(args); m->data.is_json = ((!!args.is_json) | ((uint8_t)(uintptr_t)args.engine)); - FIO_LOG_DDEBUG2("publishing pub/sub message (scheduling)"); + FIO_LOG_DDEBUG2("(%d) publishing pub/sub message (scheduling)", fio_io_pid()); fio_io_defer(fio___publish_message_task, m, NULL); return; @@ -39600,10 +39614,9 @@ FIO_IFUNC void fio___pubsub_message_parse( size_t existing = parser->len; if (!parser->msg) { while (existing < FIO___PUBSUB_MESSAGE_HEADER) { /* get message length */ - size_t consumed = - fio_io_read(io, - parser->buf + existing, - FIO___PUBSUB_MESSAGE_OVERHEAD_NET - existing); + size_t consumed = fio_io_read(io, + parser->buf + existing, + FIO___PUBSUB_MESSAGE_OVERHEAD - existing); if (!consumed) { parser->len = existing; return; @@ -39615,12 +39628,13 @@ FIO_IFUNC void fio___pubsub_message_parse( } /* known message length, read to end and publish */ fio___pubsub_message_s *m = parser->msg; - const size_t needed = m->data.channel.len + m->data.message.len + - FIO___PUBSUB_MESSAGE_OVERHEAD_NET; - FIO_LOG_DDEBUG2("(%d) pub/sub parsing IPC/peer message (%zu/%zu bytes)", - fio_io_pid(), - existing, - needed); + const size_t needed = + m->data.channel.len + m->data.message.len + FIO___PUBSUB_MESSAGE_OVERHEAD; + // FIO_LOG_DDEBUG2("(%d) pub/sub parsing IPC/peer message %p (%zu/%zu bytes)", + // fio_io_pid(), + // (void *)fio_buf2u64_le(m->data.udata), + // existing, + // needed); while (existing < needed) { size_t consumed = fio_io_read(io, (char *)m->data.udata + existing, needed - existing); @@ -39689,9 +39703,11 @@ FIO_SFUNC void fio___pubsub_protocol_on_close(void *p_, void *udata) { p->uuid[0], p->uuid[1], NULL); - FIO_LOG_INFO("(cluster) lost peer connection (%zu connections)", - fio___pubsub_broadcast_connected_count( - &FIO___PUBSUB_POSTOFFICE.remote_uuids)); + FIO_LOG_INFO( + "(%d) (pub/sub cluster) lost peer connection (%zu connections)", + fio_io_pid(), + fio___pubsub_broadcast_connected_count( + &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } fio___pubsub_message_parser_destroy(p); if (!fio_io_is_master()) @@ -39787,7 +39803,8 @@ FIO_IFUNC void fio___channel_on_create(fio_channel_s *ch) { (&i.key->subscribe + ch->is_pattern)[0](i.key, name, ch->filter); } if (!FIO___PUBSUB_POSTOFFICE.filter.remote) { /* inform root process */ - FIO_LOG_DDEBUG2("informing root process of new channel."); + FIO_LOG_DDEBUG2("(%d) informing root process of new channel.", + fio_io_pid()); fio___pubsub_message_s *m = fio___pubsub_message_author((fio_publish_args_s){ .id = (uint64_t)(ch->is_pattern + 1), @@ -39795,12 +39812,10 @@ FIO_IFUNC void fio___channel_on_create(fio_channel_s *ch) { .filter = ch->filter, .is_json = FIO___PUBSUB_SUB, }); - if (m) { - fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, - fio___pubsub_message_write2io, - m); - fio___pubsub_message_free(m); - } + fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, + fio___pubsub_message_write2io, + m); + fio___pubsub_message_free(m); } } /** Callback for when a channel is destroy. */ @@ -39879,11 +39894,16 @@ FIO_SFUNC void fio___pubsub_broadcast_hello(fio_io_s *io) { sizeof(addr)); } +FIO_SFUNC void fio___pubsub_broadcast_hello_task_done(void *io_, void *ignr_) { + (void)ignr_; + fio_io_s *io = (fio_io_s *)io_; + fio_io_free(io); +} + FIO_SFUNC int fio___pubsub_broadcast_hello_task(void *io_, void *ignr_) { (void)ignr_; fio_io_s *io = (fio_io_s *)io_; fio___pubsub_broadcast_hello(io); - fio_io_free(io); return 0; } @@ -39929,6 +39949,7 @@ FIO_SFUNC void fio___pubsub_broadcast_on_attach(fio_io_s *io) { fio___pubsub_broadcast_hello((FIO___PUBSUB_POSTOFFICE.broadcaster = io)); fio_io_run_every(.fn = fio___pubsub_broadcast_hello_task, .udata1 = fio_io_dup(io), + .on_finish = fio___pubsub_broadcast_hello_task_done, .every = (uint32_t)(1024 | (1023 & FIO___PUBSUB_POSTOFFICE.uuid.u64[0])), @@ -39969,11 +39990,12 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { &FIO___PUBSUB_POSTOFFICE.remote_uuids, buf[0], buf[1]) == buf[1]) { - FIO_LOG_DDEBUG2("skipping peer connection - already exists"); + FIO_LOG_DDEBUG2("(%d) skipping peer connection - already exists", + fio_io_pid()); continue; /* skip connection, already exists. */ } should_say_hello |= 1; - FIO_LOG_DDEBUG2("detected peer, should now connect"); + FIO_LOG_DDEBUG2("(%d) detected peer, should now connect", fio_io_pid()); /* TODO: fixme! */ char addr_buf[128]; @@ -40011,6 +40033,7 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { if (should_say_hello) fio_io_run_every(.fn = fio___pubsub_broadcast_hello_task, .udata1 = fio_io_dup(io), + .on_finish = fio___pubsub_broadcast_hello_task_done, .every = (uint32_t)(1024 | (1023 & @@ -40020,10 +40043,11 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { FIO_SFUNC void fio___pubsub_broadcast_on_incoming(fio_io_s *io) { int fd; while ((fd = accept(fio_io_fd(io), NULL, NULL)) != -1) { - FIO_LOG_DDEBUG2("accepting a cluster peer connection"); + FIO_LOG_DDEBUG2("(%d) accepting a cluster peer connection", fio_io_pid()); fio_io_attach_fd(fd, &FIO___PUBSUB_POSTOFFICE.protocol.remote, NULL, NULL); } - FIO_LOG_INFO("(cluster) accepted new peer(s) (%zu connections).", + FIO_LOG_INFO("(%d) (cluster) accepted new peer(s) (%zu connections).", + fio_io_pid(), fio___pubsub_broadcast_connected_count( &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } @@ -40042,8 +40066,9 @@ SFUNC void fio___pubsub_broadcast_on_port(void *port_) { }; if (FIO___PUBSUB_POSTOFFICE.secret_is_random) { FIO_LOG_ERROR( - "Listening to cluster peer connections failed!" - "\n\tUsing a random (non-shared) secret, cannot validate peers."); + "(%d) Listening to cluster peer connections failed!" + "\n\tUsing a random (non-shared) secret, cannot validate peers.", + fio_io_pid()); return; } if (!port || port < 0) @@ -41213,7 +41238,7 @@ static void fio___http_str_cached_init(void) { FIO___HTTP_STATIC_CACHE_IMAP, FIO___HTTP_STATIC_CACHE_CAPA_BITS, (void *)&obj, - hash, + (uint32_t)hash, fio___http_str_cached_cmp, FIO___HTTP_STATIC_CACHE_STEP_LIMIT); FIO_ASSERT(!pos.is_valid && pos.ipos < FIO___HTTP_STATIC_CACHE_CAPA && @@ -54647,6 +54672,53 @@ FIO_SFUNC void FIO_NAME_TEST(stl, chacha)(void) { buffer); } } + { /* test roundtrip */ + fprintf(stderr, "\t * Testing ChaCha20Poly1305 round-trip.\n"); + fio_u256 key = + fio_u256_init64(fio_rand64(), fio_rand64(), fio_rand64(), fio_rand64()); + FIO_STR_INFO_TMP_VAR(ad, 128); + FIO_STR_INFO_TMP_VAR(plaintext, 1024); + FIO_STR_INFO_TMP_VAR(cyphertext, 1024); + FIO_STR_INFO_TMP_VAR(decrypted, 1024); + fio_string_write2(&ad, + NULL, + FIO_STRING_WRITE_STR1( + "This is unencrypted additional data with a nonce:"), + FIO_STRING_WRITE_HEX(fio_rand64())); + fio_string_write2( + &plaintext, + NULL, + FIO_STRING_WRITE_STR1( + "This is unencrypted text that will eventually be encrypted, the " + "following are the whole 0-255 byte values:")); + for (size_t i = 0; i < 256; ++i) { + plaintext.buf[plaintext.len++] = (char)i; + } + plaintext.buf[plaintext.len] = 0; + FIO_MEMCPY(cyphertext.buf, plaintext.buf, plaintext.len); + cyphertext.len = plaintext.len; + fio_chacha20_poly1305_enc(ad.buf + ad.len, + cyphertext.buf, + cyphertext.len, + ad.buf, /* additional data */ + ad.len, + key.u8, + ad.buf + ad.len - 12); + FIO_MEMCPY(decrypted.buf, cyphertext.buf, cyphertext.len); + decrypted.len = cyphertext.len; + FIO_ASSERT(!fio_chacha20_poly1305_dec(ad.buf + ad.len, + decrypted.buf, + decrypted.len, + ad.buf, /* additional data */ + ad.len, + key.u8, + ad.buf + ad.len - 12), + "fio_chacha20_poly1305_dec failed!"); + FIO_ASSERT(FIO_MEMCMP(cyphertext.buf, plaintext.buf, plaintext.len), + "chacha20 cypher-text should be different than plain-text."); + FIO_ASSERT(!FIO_MEMCMP(decrypted.buf, plaintext.buf, plaintext.len), + "chacha20_poly1305 roundtrip error!"); + } #if !DEBUG fio_test_hash_function(fio__poly1305_speed_wrapper, diff --git a/fio-stl.md b/fio-stl.md index a4c982a..cff3cc4 100644 --- a/fio-stl.md +++ b/fio-stl.md @@ -6571,13 +6571,13 @@ void fio_chacha20_poly1305_enc(void *mac, void *ad, /* additional data */ size_t adlen, void *key, - void *nounce); + void *nonce); ``` Performs an in-place encryption of `data` using ChaCha20 with additional data, producing a 16 byte message authentication code (MAC) using Poly1305. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `ad` MAY be omitted, will NOT be encrypted. * `data` MAY be omitted, WILL be encrypted. * `mac` MUST point to a buffer with (at least) 16 available bytes. @@ -6591,13 +6591,13 @@ int fio_chacha20_poly1305_dec(void *mac, void *ad, /* additional data */ size_t adlen, void *key, - void *nounce); + void *nonce); ``` Performs an in-place decryption of `data` using ChaCha20 after authenticating the message authentication code (MAC) using Poly1305. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `ad` MAY be omitted ONLY IF originally omitted. * `data` MAY be omitted, WILL be decrypted. * `mac` MUST point to a buffer where the 16 byte MAC is placed. @@ -6610,14 +6610,14 @@ Returns `-1` on error (authentication failed). void fio_chacha20(void *data, size_t len, void *key, - void *nounce, + void *nonce, uint32_t counter); ``` Performs an in-place encryption/decryption of `data` using ChaCha20. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `counter` is the block counter, usually 1 unless `data` is mid-cyphertext. diff --git a/fio-stl/152 chacha20poly1305.h b/fio-stl/152 chacha20poly1305.h index 1db0998..9c7be83 100644 --- a/fio-stl/152 chacha20poly1305.h +++ b/fio-stl/152 chacha20poly1305.h @@ -27,7 +27,7 @@ ChaCha20Poly1305 API * data, producing a 16 byte message authentication code (MAC) using Poly1305. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `ad` MAY be omitted, will NOT be encrypted. * * `data` MAY be omitted, WILL be encrypted. * * `mac` MUST point to a buffer with (at least) 16 available bytes. @@ -38,14 +38,14 @@ SFUNC void fio_chacha20_poly1305_enc(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce); + const void *nonce); /** * Performs an in-place decryption of `data` using ChaCha20 after authenticating * the message authentication code (MAC) using Poly1305. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `ad` MAY be omitted ONLY IF originally omitted. * * `data` MAY be omitted, WILL be decrypted. * * `mac` MUST point to a buffer where the 16 byte MAC is placed. @@ -58,7 +58,7 @@ SFUNC int fio_chacha20_poly1305_dec(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce); + const void *nonce); /* ***************************************************************************** Using ChaCha20 and Poly1305 separately @@ -68,13 +68,13 @@ Using ChaCha20 and Poly1305 separately * Performs an in-place encryption/decryption of `data` using ChaCha20. * * * `key` MUST point to a 256 bit long memory address (32 Bytes). - * * `nounce` MUST point to a 96 bit long memory address (12 Bytes). + * * `nonce` MUST point to a 96 bit long memory address (12 Bytes). * * `counter` is the block counter, usually 1 unless `data` is mid-cyphertext. */ SFUNC void fio_chacha20(void *restrict data, size_t len, const void *key, - const void *nounce, + const void *nonce, uint32_t counter); /** @@ -347,7 +347,7 @@ ChaCha20 (encryption) } FIO_IFUNC fio_u512 fio___chacha_init(const void *key, - const void *nounce, + const void *nonce, uint32_t counter) { fio_u512 o = { .u32 = @@ -363,9 +363,9 @@ FIO_IFUNC fio_u512 fio___chacha_init(const void *key, fio_buf2u32_le((uint8_t *)key + 24), fio_buf2u32_le((uint8_t *)key + 28), counter, - fio_buf2u32_le(nounce), - fio_buf2u32_le((uint8_t *)nounce + 4), - fio_buf2u32_le((uint8_t *)nounce + 8), + fio_buf2u32_le(nonce), + fio_buf2u32_le((uint8_t *)nonce + 4), + fio_buf2u32_le((uint8_t *)nonce + 8), }, // clang-format on }; return o; @@ -466,9 +466,9 @@ FIO_SFUNC void fio___chacha_vround20x2(fio_u512 c, uint8_t *restrict data) { SFUNC void fio_chacha20(void *restrict data, size_t len, const void *key, - const void *nounce, + const void *nonce, uint32_t counter) { - fio_u512 c = fio___chacha_init(key, nounce, counter); + fio_u512 c = fio___chacha_init(key, nonce, counter); for (size_t pos = 127; pos < len; pos += 128) { fio___chacha_vround20x2(c, (uint8_t *)data); c.u32[12] += 2; /* block counter */ @@ -502,8 +502,8 @@ SFUNC void fio_chacha20_poly1305_enc(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { - fio_u512 c = fio___chacha_init(key, nounce, 0); + const void *nonce) { + fio_u512 c = fio___chacha_init(key, nonce, 0); fio___poly_s pl; { fio_u512 c2 = fio___chacha20_mixround(c); @@ -583,10 +583,10 @@ SFUNC void fio_chacha20_poly1305_auth(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { + const void *nonce) { fio___poly_s pl; { - fio_u512 c = fio___chacha_init(key, nounce, 0); + fio_u512 c = fio___chacha_init(key, nonce, 0); c = fio___chacha20_mixround(c); /* computes poly1305 key */ pl = fio___poly_init(&c); } @@ -625,13 +625,13 @@ SFUNC int fio_chacha20_poly1305_dec(void *restrict mac, const void *ad, /* additional data */ size_t adlen, const void *key, - const void *nounce) { + const void *nonce) { uint64_t auth[2]; - fio_chacha20_poly1305_auth(&auth, data, len, ad, adlen, key, nounce); + fio_chacha20_poly1305_auth(&auth, data, len, ad, adlen, key, nonce); if (((auth[0] ^ fio_buf2u64u(mac)) | (auth[1] ^ fio_buf2u64u(((char *)mac + 8))))) return -1; - fio_chacha20(data, len, key, nounce, 1); + fio_chacha20(data, len, key, nonce, 1); return 0; } /* ***************************************************************************** diff --git a/fio-stl/152 chacha20poly1305.md b/fio-stl/152 chacha20poly1305.md index e03ed1c..d4fbb1a 100644 --- a/fio-stl/152 chacha20poly1305.md +++ b/fio-stl/152 chacha20poly1305.md @@ -20,13 +20,13 @@ void fio_chacha20_poly1305_enc(void *mac, void *ad, /* additional data */ size_t adlen, void *key, - void *nounce); + void *nonce); ``` Performs an in-place encryption of `data` using ChaCha20 with additional data, producing a 16 byte message authentication code (MAC) using Poly1305. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `ad` MAY be omitted, will NOT be encrypted. * `data` MAY be omitted, WILL be encrypted. * `mac` MUST point to a buffer with (at least) 16 available bytes. @@ -40,13 +40,13 @@ int fio_chacha20_poly1305_dec(void *mac, void *ad, /* additional data */ size_t adlen, void *key, - void *nounce); + void *nonce); ``` Performs an in-place decryption of `data` using ChaCha20 after authenticating the message authentication code (MAC) using Poly1305. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `ad` MAY be omitted ONLY IF originally omitted. * `data` MAY be omitted, WILL be decrypted. * `mac` MUST point to a buffer where the 16 byte MAC is placed. @@ -59,14 +59,14 @@ Returns `-1` on error (authentication failed). void fio_chacha20(void *data, size_t len, void *key, - void *nounce, + void *nonce, uint32_t counter); ``` Performs an in-place encryption/decryption of `data` using ChaCha20. * `key` MUST point to a 256 bit long memory address (32 Bytes). -* `nounce` MUST point to a 96 bit long memory address (12 Bytes). +* `nonce` MUST point to a 96 bit long memory address (12 Bytes). * `counter` is the block counter, usually 1 unless `data` is mid-cyphertext. diff --git a/fio-stl/401 io types.h b/fio-stl/401 io types.h index 30f58db..d742435 100644 --- a/fio-stl/401 io types.h +++ b/fio-stl/401 io types.h @@ -265,7 +265,7 @@ IO Reactor State Machine #define FIO___IO_FLAG_WAKEUP (1U) -SFUNC struct { +SFUNC struct FIO___IO { fio_poll_s poll; int64_t tick; fio_queue_s queue; @@ -519,10 +519,11 @@ SFUNC fio_io_s *fio_io_attach_fd(int fd, .active = FIO___IO.tick, }; fio_sock_set_non_block(fd); - FIO_LOG_DDEBUG2("(%d) attaching fd %d to IO object %p", + FIO_LOG_DDEBUG2("(%d) attaching fd %d to IO object %p (%zu bytes buffer)", fio_io_pid(), fd, - (void *)io); + (void *)io, + fio_io_buffer_len(io)); fio_io_defer(fio___io_protocol_set, (void *)fio___io_dup2(io), (void *)pr); return io; diff --git a/fio-stl/420 pubsub.h b/fio-stl/420 pubsub.h index 4a35112..c208be8 100644 --- a/fio-stl/420 pubsub.h +++ b/fio-stl/420 pubsub.h @@ -470,11 +470,8 @@ Message wire format (as 64 bit numerals in little endien encoding): #undef FIO___PUBSUB_MESSAGE_HEADER #define FIO___PUBSUB_MESSAGE_HEADER 24 /* header + 2 NUL bytes (message + channel) + 16 byte MAC */ -#undef FIO___PUBSUB_MESSAGE_OVERHEAD_NET -#define FIO___PUBSUB_MESSAGE_OVERHEAD_NET (FIO___PUBSUB_MESSAGE_HEADER + 18) -/* extra 2 NUL bytes (after message & channel name) */ #undef FIO___PUBSUB_MESSAGE_OVERHEAD -#define FIO___PUBSUB_MESSAGE_OVERHEAD (FIO___PUBSUB_MESSAGE_OVERHEAD_NET + 2) +#define FIO___PUBSUB_MESSAGE_OVERHEAD (FIO___PUBSUB_MESSAGE_HEADER + 18) /* ***************************************************************************** Pub/Sub - defaults and builtin pub/sub engines @@ -589,28 +586,24 @@ typedef struct { size_t len; uint64_t uuid[2]; fio___pubsub_message_s *msg; - char buf[FIO___PUBSUB_MESSAGE_OVERHEAD_NET]; + char buf[]; } fio___pubsub_message_parser_s; FIO_LEAK_COUNTER_DEF(fio___pubsub_message_parser_s) FIO_IFUNC fio___pubsub_message_parser_s *fio___pubsub_message_parser( fio_io_s *io) { - return (fio___pubsub_message_parser_s *)fio_io_buffer(io); + return io ? (fio___pubsub_message_parser_s *)fio_io_buffer(io) : NULL; } FIO_SFUNC void fio___pubsub_message_parser_init( fio___pubsub_message_parser_s *p) { FIO_LEAK_COUNTER_ON_ALLOC(fio___pubsub_message_parser_s); - p->len = 0; - p->uuid[0] = p->uuid[1] = 0; - p->msg = NULL; + *p = (fio___pubsub_message_parser_s){0}; } FIO_SFUNC void fio___pubsub_message_parser_destroy( fio___pubsub_message_parser_s *p) { - if (!p) - return; fio___pubsub_message_free(p->msg); FIO_LEAK_COUNTER_ON_FREE(fio___pubsub_message_parser_s); } @@ -813,7 +806,8 @@ static struct FIO___PUBSUB_POSTOFFICE { .on_data = fio___pubsub_protocol_on_data_master, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, - .buffer_size = sizeof(fio___pubsub_message_parser_s), + .buffer_size = sizeof(fio___pubsub_message_parser_s) + + FIO___PUBSUB_MESSAGE_OVERHEAD, }, .remote = { @@ -821,7 +815,8 @@ static struct FIO___PUBSUB_POSTOFFICE { .on_data = fio___pubsub_protocol_on_data_remote, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio___pubsub_protocol_on_timeout, - .buffer_size = sizeof(fio___pubsub_message_parser_s), + .buffer_size = sizeof(fio___pubsub_message_parser_s) + + FIO___PUBSUB_MESSAGE_OVERHEAD, }, }, }; @@ -1057,12 +1052,16 @@ FIO_CONSTRUCTOR(fio_postoffice_init) { .on_data = fio___pubsub_protocol_on_data_master, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, + .buffer_size = + sizeof(fio___pubsub_message_parser_s) + FIO___PUBSUB_MESSAGE_OVERHEAD, }; FIO___PUBSUB_POSTOFFICE.protocol.remote = (fio_io_protocol_s){ .on_attach = fio___pubsub_protocol_on_attach, .on_data = fio___pubsub_protocol_on_data_remote, .on_close = fio___pubsub_protocol_on_close, .on_timeout = fio_io_touch, + .buffer_size = + sizeof(fio___pubsub_message_parser_s) + FIO___PUBSUB_MESSAGE_OVERHEAD, }; } @@ -1449,12 +1448,15 @@ FIO_IFUNC fio___pubsub_message_s *fio___pubsub_message_alloc(void *header) { const size_t channel_len = fio_buf2u16_le((char *)header + 18); const size_t message_len = fio_buf2u24_le((char *)header + 20); m = fio___pubsub_message_new(((channel_len + message_len) << 1) + - FIO___PUBSUB_MESSAGE_OVERHEAD); + (FIO___PUBSUB_MESSAGE_OVERHEAD + 2)); FIO_ASSERT_ALLOC(m); - m->data = (fio_msg_s){ - .udata = m->buf + channel_len + message_len + 2, - .channel = FIO_BUF_INFO2(m->buf, channel_len), - .message = FIO_BUF_INFO2(m->buf + channel_len + 1, message_len), + *m = (fio___pubsub_message_s){ + .data = + (fio_msg_s){ + .udata = m->buf + channel_len + message_len + 2, + .channel = FIO_BUF_INFO2(m->buf, channel_len), + .message = FIO_BUF_INFO2(m->buf + channel_len + 1, message_len), + }, }; return m; } @@ -1463,22 +1465,27 @@ FIO_IFUNC fio___pubsub_message_s *fio___pubsub_message_author( fio_publish_args_s args) { fio___pubsub_message_s *m = fio___pubsub_message_new(((args.message.len + args.channel.len) << 1) + - FIO___PUBSUB_MESSAGE_OVERHEAD); + (FIO___PUBSUB_MESSAGE_OVERHEAD + 2)); FIO_ASSERT_ALLOC(m); - m->data = (fio_msg_s){ - .io = args.from, - .id = args.id ? args.id : fio_rand64(), - .published = args.published ? args.published - : (uint64_t)fio_time2milli(fio_time_real()), - .channel = FIO_BUF_INFO2(m->buf, args.channel.len), - .message = FIO_BUF_INFO2(m->buf + args.channel.len + 1, args.message.len), - .filter = args.filter, - .is_json = args.is_json, + *m = (fio___pubsub_message_s){ + .data = + (fio_msg_s){ + .io = args.from, + .id = args.id ? args.id : fio_rand64(), + .published = args.published + ? args.published + : (uint64_t)fio_time2milli(fio_time_real()), + .channel = FIO_BUF_INFO2(m->buf, args.channel.len), + .message = FIO_BUF_INFO2(m->buf + args.channel.len + 1, + args.message.len), + .filter = args.filter, + .is_json = args.is_json, + }, }; - if (args.channel.len) + if (args.channel.buf && args.channel.len) FIO_MEMCPY(m->data.channel.buf, args.channel.buf, args.channel.len); m->data.channel.buf[args.channel.len] = 0; - if (args.message.buf) + if (args.message.buf && args.message.len) FIO_MEMCPY(m->data.message.buf, args.message.buf, args.message.len); m->data.message.buf[args.message.len] = 0; return m; @@ -1490,6 +1497,7 @@ FIO_SFUNC void fio___pubsub_message_encrypt(fio___pubsub_message_s *m) { const void *k = fio___pubsub_secret_key(m->data.id); const uint64_t nonce[2] = {fio_risky_num(m->data.id, 0), m->data.published}; uint8_t *pos = (uint8_t *)(m->data.message.buf + m->data.message.len + 1); + uint8_t *dest = pos; m->data.udata = (void *)pos; fio_u2buf64_le(pos, m->data.id); pos += 8; @@ -1507,14 +1515,13 @@ FIO_SFUNC void fio___pubsub_message_encrypt(fio___pubsub_message_s *m) { if (enc_len == 2) return; pos += enc_len; - fio_chacha20_poly1305_enc( - pos, - (void *)((char *)(m->data.udata) + FIO___PUBSUB_MESSAGE_HEADER), - m->data.channel.len + m->data.message.len + 2, - m->data.udata, - FIO___PUBSUB_MESSAGE_HEADER, - k, - nonce); + fio_chacha20_poly1305_enc(pos, + (void *)(dest + FIO___PUBSUB_MESSAGE_HEADER), + m->data.channel.len + m->data.message.len + 2, + m->data.udata, + FIO___PUBSUB_MESSAGE_HEADER, + k, + nonce); } FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { @@ -1522,7 +1529,8 @@ FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { return 0; if (!m->data.udata) return -1; - uint8_t *pos = (uint8_t *)(m->data.udata); + uint8_t *pos = (uint8_t *)(m->data.message.buf + m->data.message.len + 1); + uint8_t *const dest = pos; m->data.id = fio_buf2u64_le(pos); pos += 8; m->data.published = fio_buf2u64_le(pos); @@ -1546,7 +1554,7 @@ FIO_SFUNC int fio___pubsub_message_decrypt(fio___pubsub_message_s *m) { m->buf, m->data.channel.len + m->data.message.len + 2, - m->data.udata, + dest, FIO___PUBSUB_MESSAGE_HEADER, k, nonce); @@ -1564,12 +1572,15 @@ FIO_IFUNC void fio___pubsub_message_write2io(fio_io_s *io, void *m_) { fio___pubsub_message_s *m = (fio___pubsub_message_s *)m_; if (io == m->data.io) return; - FIO_LOG_DDEBUG2("(%d) pub/sub sending IPC/peer message.", fio_io_pid()); + FIO_LOG_DDEBUG2("(%d) pub/sub sending IPC/peer message: %zu bytes", + fio_io_pid(), + m->data.message.len + m->data.channel.len + + FIO___PUBSUB_MESSAGE_OVERHEAD); fio___pubsub_message_encrypt(m); fio_io_write2(io, .buf = fio___pubsub_message_dup(m), .len = (m->data.message.len + m->data.channel.len + - FIO___PUBSUB_MESSAGE_OVERHEAD_NET), + FIO___PUBSUB_MESSAGE_OVERHEAD), .offset = ((uintptr_t)(m->data.udata) - (uintptr_t)(m)), .dealloc = (void (*)(void *))fio___pubsub_message_free); } @@ -1600,11 +1611,6 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { if (flags & FIO___PUBSUB_SPECIAL) goto is_special_message; - if ((FIO___PUBSUB_POSTOFFICE.filter.publish & flags)) - fio_queue_push(fio_io_queue(), - fio___pubsub_message_deliver_task, - fio___pubsub_message_dup(m)); - if ((FIO___PUBSUB_POSTOFFICE.filter.local & flags)) fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, fio___pubsub_message_write2io, @@ -1614,6 +1620,12 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.remote, fio___pubsub_message_write2io, m); + + if ((FIO___PUBSUB_POSTOFFICE.filter.publish & flags)) + fio_queue_push(fio_io_queue(), + fio___pubsub_message_deliver_task, + fio___pubsub_message_dup(m)); + return; is_special_message: @@ -1646,7 +1658,8 @@ FIO_SFUNC void fio___pubsub_message_route(fio___pubsub_message_s *m) { p->uuid[0], p->uuid[1]); } - FIO_LOG_INFO("(cluster) identified new peer (%zu connections)", + FIO_LOG_INFO("(%d - cluster) identified new peer (%zu connections)", + fio_io_pid(), fio___pubsub_broadcast_connected_count( &FIO___PUBSUB_POSTOFFICE.remote_uuids)); return; @@ -1723,7 +1736,7 @@ void fio_publish FIO_NOOP(fio_publish_args_s args) { m = fio___pubsub_message_author(args); m->data.is_json = ((!!args.is_json) | ((uint8_t)(uintptr_t)args.engine)); - FIO_LOG_DDEBUG2("publishing pub/sub message (scheduling)"); + FIO_LOG_DDEBUG2("(%d) publishing pub/sub message (scheduling)", fio_io_pid()); fio_io_defer(fio___publish_message_task, m, NULL); return; @@ -1753,10 +1766,9 @@ FIO_IFUNC void fio___pubsub_message_parse( size_t existing = parser->len; if (!parser->msg) { while (existing < FIO___PUBSUB_MESSAGE_HEADER) { /* get message length */ - size_t consumed = - fio_io_read(io, - parser->buf + existing, - FIO___PUBSUB_MESSAGE_OVERHEAD_NET - existing); + size_t consumed = fio_io_read(io, + parser->buf + existing, + FIO___PUBSUB_MESSAGE_OVERHEAD - existing); if (!consumed) { parser->len = existing; return; @@ -1768,12 +1780,13 @@ FIO_IFUNC void fio___pubsub_message_parse( } /* known message length, read to end and publish */ fio___pubsub_message_s *m = parser->msg; - const size_t needed = m->data.channel.len + m->data.message.len + - FIO___PUBSUB_MESSAGE_OVERHEAD_NET; - FIO_LOG_DDEBUG2("(%d) pub/sub parsing IPC/peer message (%zu/%zu bytes)", - fio_io_pid(), - existing, - needed); + const size_t needed = + m->data.channel.len + m->data.message.len + FIO___PUBSUB_MESSAGE_OVERHEAD; + // FIO_LOG_DDEBUG2("(%d) pub/sub parsing IPC/peer message %p (%zu/%zu bytes)", + // fio_io_pid(), + // (void *)fio_buf2u64_le(m->data.udata), + // existing, + // needed); while (existing < needed) { size_t consumed = fio_io_read(io, (char *)m->data.udata + existing, needed - existing); @@ -1842,9 +1855,11 @@ FIO_SFUNC void fio___pubsub_protocol_on_close(void *p_, void *udata) { p->uuid[0], p->uuid[1], NULL); - FIO_LOG_INFO("(cluster) lost peer connection (%zu connections)", - fio___pubsub_broadcast_connected_count( - &FIO___PUBSUB_POSTOFFICE.remote_uuids)); + FIO_LOG_INFO( + "(%d) (pub/sub cluster) lost peer connection (%zu connections)", + fio_io_pid(), + fio___pubsub_broadcast_connected_count( + &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } fio___pubsub_message_parser_destroy(p); if (!fio_io_is_master()) @@ -1940,7 +1955,8 @@ FIO_IFUNC void fio___channel_on_create(fio_channel_s *ch) { (&i.key->subscribe + ch->is_pattern)[0](i.key, name, ch->filter); } if (!FIO___PUBSUB_POSTOFFICE.filter.remote) { /* inform root process */ - FIO_LOG_DDEBUG2("informing root process of new channel."); + FIO_LOG_DDEBUG2("(%d) informing root process of new channel.", + fio_io_pid()); fio___pubsub_message_s *m = fio___pubsub_message_author((fio_publish_args_s){ .id = (uint64_t)(ch->is_pattern + 1), @@ -1948,12 +1964,10 @@ FIO_IFUNC void fio___channel_on_create(fio_channel_s *ch) { .filter = ch->filter, .is_json = FIO___PUBSUB_SUB, }); - if (m) { - fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, - fio___pubsub_message_write2io, - m); - fio___pubsub_message_free(m); - } + fio_io_protocol_each(&FIO___PUBSUB_POSTOFFICE.protocol.ipc, + fio___pubsub_message_write2io, + m); + fio___pubsub_message_free(m); } } /** Callback for when a channel is destroy. */ @@ -2032,11 +2046,16 @@ FIO_SFUNC void fio___pubsub_broadcast_hello(fio_io_s *io) { sizeof(addr)); } +FIO_SFUNC void fio___pubsub_broadcast_hello_task_done(void *io_, void *ignr_) { + (void)ignr_; + fio_io_s *io = (fio_io_s *)io_; + fio_io_free(io); +} + FIO_SFUNC int fio___pubsub_broadcast_hello_task(void *io_, void *ignr_) { (void)ignr_; fio_io_s *io = (fio_io_s *)io_; fio___pubsub_broadcast_hello(io); - fio_io_free(io); return 0; } @@ -2082,6 +2101,7 @@ FIO_SFUNC void fio___pubsub_broadcast_on_attach(fio_io_s *io) { fio___pubsub_broadcast_hello((FIO___PUBSUB_POSTOFFICE.broadcaster = io)); fio_io_run_every(.fn = fio___pubsub_broadcast_hello_task, .udata1 = fio_io_dup(io), + .on_finish = fio___pubsub_broadcast_hello_task_done, .every = (uint32_t)(1024 | (1023 & FIO___PUBSUB_POSTOFFICE.uuid.u64[0])), @@ -2122,11 +2142,12 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { &FIO___PUBSUB_POSTOFFICE.remote_uuids, buf[0], buf[1]) == buf[1]) { - FIO_LOG_DDEBUG2("skipping peer connection - already exists"); + FIO_LOG_DDEBUG2("(%d) skipping peer connection - already exists", + fio_io_pid()); continue; /* skip connection, already exists. */ } should_say_hello |= 1; - FIO_LOG_DDEBUG2("detected peer, should now connect"); + FIO_LOG_DDEBUG2("(%d) detected peer, should now connect", fio_io_pid()); /* TODO: fixme! */ char addr_buf[128]; @@ -2164,6 +2185,7 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { if (should_say_hello) fio_io_run_every(.fn = fio___pubsub_broadcast_hello_task, .udata1 = fio_io_dup(io), + .on_finish = fio___pubsub_broadcast_hello_task_done, .every = (uint32_t)(1024 | (1023 & @@ -2173,10 +2195,11 @@ FIO_SFUNC void fio___pubsub_broadcast_on_data(fio_io_s *io) { FIO_SFUNC void fio___pubsub_broadcast_on_incoming(fio_io_s *io) { int fd; while ((fd = accept(fio_io_fd(io), NULL, NULL)) != -1) { - FIO_LOG_DDEBUG2("accepting a cluster peer connection"); + FIO_LOG_DDEBUG2("(%d) accepting a cluster peer connection", fio_io_pid()); fio_io_attach_fd(fd, &FIO___PUBSUB_POSTOFFICE.protocol.remote, NULL, NULL); } - FIO_LOG_INFO("(cluster) accepted new peer(s) (%zu connections).", + FIO_LOG_INFO("(%d) (cluster) accepted new peer(s) (%zu connections).", + fio_io_pid(), fio___pubsub_broadcast_connected_count( &FIO___PUBSUB_POSTOFFICE.remote_uuids)); } @@ -2195,8 +2218,9 @@ SFUNC void fio___pubsub_broadcast_on_port(void *port_) { }; if (FIO___PUBSUB_POSTOFFICE.secret_is_random) { FIO_LOG_ERROR( - "Listening to cluster peer connections failed!" - "\n\tUsing a random (non-shared) secret, cannot validate peers."); + "(%d) Listening to cluster peer connections failed!" + "\n\tUsing a random (non-shared) secret, cannot validate peers.", + fio_io_pid()); return; } if (!port || port < 0) diff --git a/fio-stl/903 chacha.h b/fio-stl/903 chacha.h index 8f89aaa..de1ba16 100644 --- a/fio-stl/903 chacha.h +++ b/fio-stl/903 chacha.h @@ -330,6 +330,53 @@ FIO_SFUNC void FIO_NAME_TEST(stl, chacha)(void) { buffer); } } + { /* test roundtrip */ + fprintf(stderr, "\t * Testing ChaCha20Poly1305 round-trip.\n"); + fio_u256 key = + fio_u256_init64(fio_rand64(), fio_rand64(), fio_rand64(), fio_rand64()); + FIO_STR_INFO_TMP_VAR(ad, 128); + FIO_STR_INFO_TMP_VAR(plaintext, 1024); + FIO_STR_INFO_TMP_VAR(cyphertext, 1024); + FIO_STR_INFO_TMP_VAR(decrypted, 1024); + fio_string_write2(&ad, + NULL, + FIO_STRING_WRITE_STR1( + "This is unencrypted additional data with a nonce:"), + FIO_STRING_WRITE_HEX(fio_rand64())); + fio_string_write2( + &plaintext, + NULL, + FIO_STRING_WRITE_STR1( + "This is unencrypted text that will eventually be encrypted, the " + "following are the whole 0-255 byte values:")); + for (size_t i = 0; i < 256; ++i) { + plaintext.buf[plaintext.len++] = (char)i; + } + plaintext.buf[plaintext.len] = 0; + FIO_MEMCPY(cyphertext.buf, plaintext.buf, plaintext.len); + cyphertext.len = plaintext.len; + fio_chacha20_poly1305_enc(ad.buf + ad.len, + cyphertext.buf, + cyphertext.len, + ad.buf, /* additional data */ + ad.len, + key.u8, + ad.buf + ad.len - 12); + FIO_MEMCPY(decrypted.buf, cyphertext.buf, cyphertext.len); + decrypted.len = cyphertext.len; + FIO_ASSERT(!fio_chacha20_poly1305_dec(ad.buf + ad.len, + decrypted.buf, + decrypted.len, + ad.buf, /* additional data */ + ad.len, + key.u8, + ad.buf + ad.len - 12), + "fio_chacha20_poly1305_dec failed!"); + FIO_ASSERT(FIO_MEMCMP(cyphertext.buf, plaintext.buf, plaintext.len), + "chacha20 cypher-text should be different than plain-text."); + FIO_ASSERT(!FIO_MEMCMP(decrypted.buf, plaintext.buf, plaintext.len), + "chacha20_poly1305 roundtrip error!"); + } #if !DEBUG fio_test_hash_function(fio__poly1305_speed_wrapper,