Skip to content

Commit

Permalink
Simplify solution
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jun 21, 2024
1 parent 8663412 commit e146093
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
48 changes: 18 additions & 30 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,11 +1660,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing
if (!syncSucess) {
SWARN("Failed generating sync response to SUBSCRIBE");
response.methodLine = "SUBSCRIPTION_PENDING";
}
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand All @@ -1687,7 +1683,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
transaction.content = _db.getUncommittedQuery();
_sendToPeer(peer, transaction);
}
} else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) {
} else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) {
// SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or
// more COMMITS that should be immediately applied to the database.
if (_state != SQLiteNodeState::SUBSCRIBING) {
Expand All @@ -1696,24 +1692,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
if (_leadPeer != peer) {
STHROW("not subscribing to you");
}
if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) {
// This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again.
_changeState(SQLiteNodeState::SYNCHRONIZING);
} else {
SINFO("Received SUBSCRIPTION_APPROVED, final synchronization.");
try {
// Done synchronizing
_recvSynchronize(peer, message);
SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash()
<< "), FOLLOWING");
_changeState(SQLiteNodeState::FOLLOWING);
} catch (const SException& e) {
// Transaction failed
SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING.");
_reconnectPeer(_leadPeer);
_changeState(SQLiteNodeState::SEARCHING);
throw e;
}
SINFO("Received SUBSCRIPTION_APPROVED, final synchronization.");
try {
// Done synchronizing
_recvSynchronize(peer, message);
SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash()
<< "), FOLLOWING");
_changeState(SQLiteNodeState::FOLLOWING);
} catch (const SException& e) {
// Transaction failed
SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING.");
_reconnectPeer(_leadPeer);
_changeState(SQLiteNodeState::SEARCHING);
throw e;
}
} else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) {
if (_replicationThreadsShouldExit) {
Expand Down Expand Up @@ -2094,7 +2085,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance

static int __ATTEMPTS = 0;

bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand All @@ -2119,8 +2110,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Instead of reconnecting, we tell the peer that we don't match. It's up to the peer to reconnect.
response["hashMismatchValue"] = myHash;
response["hashMismatchNumber"] = to_string(peerCommitCount);

return false;
}
PINFO("Latest commit hash matches our records, beginning synchronization.");
} else {
Expand Down Expand Up @@ -2162,11 +2151,12 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
SWARN("Timed out while running synchronization query.");
STHROW("synchronization query timeout");
} else {
STHROW("error getting commits");
}
}

if ((uint64_t)result.size() != toIndex - fromIndex + 1) {
STHROW("mismatched commit count");
}
Expand All @@ -2184,8 +2174,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
response.content += commit.serialize();
}
}

return true;
}

void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) {
Expand Down
3 changes: 1 addition & 2 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
// Returns true on success, false on failure.
static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down

0 comments on commit e146093

Please sign in to comment.