From e14609373ed6311c21baab46dd6fb51b49cec82f Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 12:47:26 -0700 Subject: [PATCH] Simplify solution --- sqlitecluster/SQLiteNode.cpp | 48 ++++++++++++++---------------------- sqlitecluster/SQLiteNode.h | 3 +-- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 75da036f0..19b27283b 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,11 +1660,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing - if (!syncSucess) { - SWARN("Failed generating sync response to SUBSCRIBE"); - response.methodLine = "SUBSCRIPTION_PENDING"; - } + _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -1687,7 +1683,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { transaction.content = _db.getUncommittedQuery(); _sendToPeer(peer, transaction); } - } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { + } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) { // SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or // more COMMITS that should be immediately applied to the database. if (_state != SQLiteNodeState::SUBSCRIBING) { @@ -1696,24 +1692,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (_leadPeer != peer) { STHROW("not subscribing to you"); } - if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { - // This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again. - _changeState(SQLiteNodeState::SYNCHRONIZING); - } else { - SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); - try { - // Done synchronizing - _recvSynchronize(peer, message); - SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() - << "), FOLLOWING"); - _changeState(SQLiteNodeState::FOLLOWING); - } catch (const SException& e) { - // Transaction failed - SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); - _reconnectPeer(_leadPeer); - _changeState(SQLiteNodeState::SEARCHING); - throw e; - } + SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); + try { + // Done synchronizing + _recvSynchronize(peer, message); + SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() + << "), FOLLOWING"); + _changeState(SQLiteNodeState::FOLLOWING); + } catch (const SException& e) { + // Transaction failed + SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); + _reconnectPeer(_leadPeer); + _changeState(SQLiteNodeState::SEARCHING); + throw e; } } else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) { if (_replicationThreadsShouldExit) { @@ -2094,7 +2085,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance static int __ATTEMPTS = 0; -bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2119,8 +2110,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Instead of reconnecting, we tell the peer that we don't match. It's up to the peer to reconnect. response["hashMismatchValue"] = myHash; response["hashMismatchNumber"] = to_string(peerCommitCount); - - return false; } PINFO("Latest commit hash matches our records, beginning synchronization."); } else { @@ -2162,11 +2151,12 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); if (resultCode) { if (resultCode == SQLITE_INTERRUPT) { - SWARN("Timed out while running synchronization query."); + STHROW("synchronization query timeout"); } else { STHROW("error getting commits"); } } + if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count"); } @@ -2184,8 +2174,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response.content += commit.serialize(); } } - - return true; } void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index b319d8dc5..8212ae18a 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,8 +216,7 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - // Returns true on success, false on failure. - static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const;