diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 8228d29ee..75da036f0 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,7 +1660,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + if (!syncSucess) { + SWARN("Failed generating sync response to SUBSCRIBE"); + response.methodLine = "SUBSCRIPTION_PENDING"; + } _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -1683,7 +1687,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { transaction.content = _db.getUncommittedQuery(); _sendToPeer(peer, transaction); } - } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) { + } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { // SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or // more COMMITS that should be immediately applied to the database. if (_state != SQLiteNodeState::SUBSCRIBING) { @@ -1692,19 +1696,24 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (_leadPeer != peer) { STHROW("not subscribing to you"); } - SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); - try { - // Done synchronizing - _recvSynchronize(peer, message); - SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() - << "), FOLLOWING"); - _changeState(SQLiteNodeState::FOLLOWING); - } catch (const SException& e) { - // Transaction failed - SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); - _reconnectPeer(_leadPeer); - _changeState(SQLiteNodeState::SEARCHING); - throw e; + if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { + // This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again. + _changeState(SQLiteNodeState::SYNCHRONIZING); + } else { + SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); + try { + // Done synchronizing + _recvSynchronize(peer, message); + SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() + << "), FOLLOWING"); + _changeState(SQLiteNodeState::FOLLOWING); + } catch (const SException& e) { + // Transaction failed + SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); + _reconnectPeer(_leadPeer); + _changeState(SQLiteNodeState::SEARCHING); + throw e; + } } } else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) { if (_replicationThreadsShouldExit) { @@ -2085,7 +2094,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance static int __ATTEMPTS = 0; -void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2111,7 +2120,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response["hashMismatchValue"] = myHash; response["hashMismatchNumber"] = to_string(peerCommitCount); - return; + return false; } PINFO("Latest commit hash matches our records, beginning synchronization."); } else { @@ -2175,6 +2184,8 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response.content += commit.serialize(); } } + + return true; } void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 8212ae18a..b319d8dc5 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,7 +216,8 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + // Returns true on success, false on failure. + static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const;