Skip to content

Commit

Permalink
Merge branch 'main' into tyler-typename
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jul 4, 2024
2 parents 254924f + e455458 commit 89de92f
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 23 deletions.
6 changes: 3 additions & 3 deletions libstuff/SRandom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ uint64_t SRandom::rand64() {
return _distribution64(_generator);
}

string SRandom::randStr(unsigned& length) {
string SRandom::randStr(unsigned length) {
string str = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
string newstr;
int pos;
while(newstr.size() != length) {
while (newstr.size() != length) {
pos = (rand64() % (str.size() - 1));
newstr += str.substr(pos,1);
newstr += str.substr(pos, 1);
}
return newstr;
}
2 changes: 1 addition & 1 deletion libstuff/SRandom.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class SRandom {
public:
static uint64_t rand64();
static uint64_t limitedRand64(uint64_t min, uint64_t max);
static string randStr(unsigned& length);
static string randStr(unsigned length);

private:
static mt19937_64 _generator;
Expand Down
4 changes: 2 additions & 2 deletions libstuff/libstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2806,11 +2806,11 @@ bool SIsValidSQLiteDateModifier(const string& modifier) {
}

bool SREMatch(const string& regExp, const string& s) {
return pcrecpp::RE(regExp).FullMatch(s);
return pcrecpp::RE(regExp, pcrecpp::RE_Options().set_match_limit_recursion(1000)).FullMatch(s);
}

bool SREMatch(const string& regExp, const string& s, string& match) {
return pcrecpp::RE(regExp).FullMatch(s, &match);
return pcrecpp::RE(regExp, pcrecpp::RE_Options().set_match_limit_recursion(1000)).FullMatch(s, &match);
}

void SRedactSensitiveValues(string& s) {
Expand Down
52 changes: 46 additions & 6 deletions libstuff/sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
** separate file. This file contains only code for the core SQLite library.
**
** The content in this amalgamation comes from Fossil check-in
** 66c885814e1c533f0f45ee01b6b25cd5ec08.
** 0bb306eb70ef1df7734326d30359da7a1539.
*/
#define SQLITE_CORE 1
#define SQLITE_AMALGAMATION 1
Expand Down Expand Up @@ -462,7 +462,7 @@ extern "C" {
*/
#define SQLITE_VERSION "3.45.2"
#define SQLITE_VERSION_NUMBER 3045002
#define SQLITE_SOURCE_ID "2024-06-04 16:24:12 66c885814e1c533f0f45ee01b6b25cd5ec08770d699e441b1af4f79b23e9f0a7"
#define SQLITE_SOURCE_ID "2024-07-03 20:30:31 0bb306eb70ef1df7734326d30359da7a15397171d3e25ab644633ef3ee1428ec"

/*
** CAPI3REF: Run-Time Library Version Numbers
Expand Down Expand Up @@ -19711,7 +19711,11 @@ struct Select {
** SRT_Set The result must be a single column. Store each
** row of result as the key in table pDest->iSDParm.
** Apply the affinity pDest->affSdst before storing
** results. Used to implement "IN (SELECT ...)".
** results. if pDest->iSDParm2 is positive, then it is
** a regsiter holding a Bloom filter for the IN operator
** that should be populated in addition to the
** pDest->iSDParm table. This SRT is used to
** implement "IN (SELECT ...)".
**
** SRT_EphemTab Create an temporary table pDest->iSDParm and store
** the result there. The cursor is left open after
Expand Down Expand Up @@ -100521,6 +100525,7 @@ case OP_Found: { /* jump, in3, ncycle */
r.pKeyInfo = pC->pKeyInfo;
r.default_rc = 0;
#ifdef SQLITE_DEBUG
(void)sqlite3FaultSim(50); /* For use by --counter in TH3 */
for(ii=0; ii<r.nField; ii++){
assert( memIsValid(&r.aMem[ii]) );
assert( (r.aMem[ii].flags & MEM_Zero)==0 || r.aMem[ii].n==0 );
Expand Down Expand Up @@ -107959,10 +107964,10 @@ static int bytecodevtabColumn(

#ifdef SQLITE_ENABLE_STMT_SCANSTATUS
case 9: /* nexec */
sqlite3_result_int(ctx, pOp->nExec);
sqlite3_result_int64(ctx, pOp->nExec);
break;
case 10: /* ncycle */
sqlite3_result_int(ctx, pOp->nCycle);
sqlite3_result_int64(ctx, pOp->nCycle);
break;
#else
case 9: /* nexec */
Expand Down Expand Up @@ -114461,15 +114466,30 @@ SQLITE_PRIVATE void sqlite3CodeRhsOfIN(
SelectDest dest;
int i;
int rc;
int addrBloom = 0;
sqlite3SelectDestInit(&dest, SRT_Set, iTab);
dest.zAffSdst = exprINAffinity(pParse, pExpr);
pSelect->iLimit = 0;
if( addrOnce && OptimizationEnabled(pParse->db, SQLITE_BloomFilter) ){
int regBloom = ++pParse->nMem;
addrBloom = sqlite3VdbeAddOp2(v, OP_Blob, 10000, regBloom);
VdbeComment((v, "Bloom filter"));
dest.iSDParm2 = regBloom;
}
testcase( pSelect->selFlags & SF_Distinct );
testcase( pKeyInfo==0 ); /* Caused by OOM in sqlite3KeyInfoAlloc() */
pCopy = sqlite3SelectDup(pParse->db, pSelect, 0);
rc = pParse->db->mallocFailed ? 1 :sqlite3Select(pParse, pCopy, &dest);
sqlite3SelectDelete(pParse->db, pCopy);
sqlite3DbFree(pParse->db, dest.zAffSdst);
if( addrBloom ){
sqlite3VdbeGetOp(v, addrOnce)->p3 = dest.iSDParm2;
if( dest.iSDParm2==0 ){
sqlite3VdbeChangeToNoop(v, addrBloom);
}else{
sqlite3VdbeGetOp(v, addrOnce)->p3 = dest.iSDParm2;
}
}
if( rc ){
sqlite3KeyInfoUnref(pKeyInfo);
return;
Expand Down Expand Up @@ -114912,6 +114932,15 @@ static void sqlite3ExprCodeIN(
sqlite3VdbeAddOp4(v, OP_Affinity, rLhs, nVector, 0, zAff, nVector);
if( destIfFalse==destIfNull ){
/* Combine Step 3 and Step 5 into a single opcode */
if( ExprHasProperty(pExpr, EP_Subrtn) ){
const VdbeOp *pOp = sqlite3VdbeGetOp(v, pExpr->y.sub.iAddr);
assert( pOp->opcode==OP_Once || pParse->nErr );
if( pOp->opcode==OP_Once && pOp->p3>0 ){
assert( OptimizationEnabled(pParse->db, SQLITE_BloomFilter) );
sqlite3VdbeAddOp4Int(v, OP_Filter, pOp->p3, destIfFalse,
rLhs, nVector); VdbeCoverage(v);
}
}
sqlite3VdbeAddOp4Int(v, OP_NotFound, iTab, destIfFalse,
rLhs, nVector); VdbeCoverage(v);
goto sqlite3ExprCodeIN_finished;
Expand Down Expand Up @@ -145859,12 +145888,18 @@ static void selectInnerLoop(
** case the order does matter */
pushOntoSorter(
pParse, pSort, p, regResult, regOrig, nResultCol, nPrefixReg);
pDest->iSDParm2 = 0; /* Signal that any Bloom filter is unpopulated */
}else{
int r1 = sqlite3GetTempReg(pParse);
assert( sqlite3Strlen30(pDest->zAffSdst)==nResultCol );
sqlite3VdbeAddOp4(v, OP_MakeRecord, regResult, nResultCol,
r1, pDest->zAffSdst, nResultCol);
sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iParm, r1, regResult, nResultCol);
if( pDest->iSDParm2 ){
sqlite3VdbeAddOp4Int(v, OP_FilterAdd, pDest->iSDParm2, 0,
regResult, nResultCol);
ExplainQueryPlan((pParse, 0, "CREATE BLOOM FILTER"));
}
sqlite3ReleaseTempReg(pParse, r1);
}
break;
Expand Down Expand Up @@ -147792,6 +147827,11 @@ static int generateOutputSubroutine(
r1, pDest->zAffSdst, pIn->nSdst);
sqlite3VdbeAddOp4Int(v, OP_IdxInsert, pDest->iSDParm, r1,
pIn->iSdst, pIn->nSdst);
if( pDest->iSDParm2>0 ){
sqlite3VdbeAddOp4Int(v, OP_FilterAdd, pDest->iSDParm2, 0,
pIn->iSdst, pIn->nSdst);
ExplainQueryPlan((pParse, 0, "CREATE BLOOM FILTER"));
}
sqlite3ReleaseTempReg(pParse, r1);
break;
}
Expand Down Expand Up @@ -253445,7 +253485,7 @@ static void fts5SourceIdFunc(
){
assert( nArg==0 );
UNUSED_PARAM2(nArg, apUnused);
sqlite3_result_text(pCtx, "fts5: 2024-06-04 16:24:12 66c885814e1c533f0f45ee01b6b25cd5ec08770d699e441b1af4f79b23e9f0a7", -1, SQLITE_TRANSIENT);
sqlite3_result_text(pCtx, "fts5: 2024-06-24 15:05:28 d826236e22234bd0ab7888d26f2f2eeb8f109099c8936b62dedf6597df386e45", -1, SQLITE_TRANSIENT);
}

/*
Expand Down
2 changes: 1 addition & 1 deletion libstuff/sqlite3.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ extern "C" {
*/
#define SQLITE_VERSION "3.45.2"
#define SQLITE_VERSION_NUMBER 3045002
#define SQLITE_SOURCE_ID "2024-06-04 16:24:12 66c885814e1c533f0f45ee01b6b25cd5ec08770d699e441b1af4f79b23e9f0a7"
#define SQLITE_SOURCE_ID "2024-07-03 20:30:31 0bb306eb70ef1df7734326d30359da7a15397171d3e25ab644633ef3ee1428ec"

/*
** CAPI3REF: Run-Time Library Version Numbers
Expand Down
6 changes: 6 additions & 0 deletions libstuff/sqlite3ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ struct sqlite3_api_routines {
int (*is_interrupted)(sqlite3*);
/* Version 3.43.0 and later */
int (*stmt_explain)(sqlite3_stmt*,int);
/* Version 3.44.0 and later */
void *(*get_clientdata)(sqlite3*,const char*);
int (*set_clientdata)(sqlite3*, const char*, void*, void(*)(void*));
};

/*
Expand Down Expand Up @@ -693,6 +696,9 @@ typedef int (*sqlite3_loadext_entry)(
#define sqlite3_is_interrupted sqlite3_api->is_interrupted
/* Version 3.43.0 and later */
#define sqlite3_stmt_explain sqlite3_api->stmt_explain
/* Version 3.44.0 and later */
#define sqlite3_get_clientdata sqlite3_api->get_clientdata
#define sqlite3_set_clientdata sqlite3_api->set_clientdata
#endif /* !defined(SQLITE_CORE) && !defined(SQLITE_OMIT_LOAD_EXTENSION) */

#if !defined(SQLITE_CORE) && !defined(SQLITE_OMIT_LOAD_EXTENSION)
Expand Down
12 changes: 10 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) {

// These are the values we're currently operating on, until we either commit or rollback.
_sharedData.prepareTransactionInfo(commitCount + 1, _uncommittedQuery, _uncommittedHash, _dbCountAtStart);
if (_uncommittedQuery.empty()) {
SINFO("Will commmit blank query");
}

int result = SQuery(_db, "updating journal", query);
_prepareElapsed += STimeNow() - before;
Expand Down Expand Up @@ -874,14 +877,19 @@ string SQLite::getCommittedHash() {
return _sharedData.lastCommittedHash.load();
}

bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) {
int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) {
// Look up all the queries within that range
SASSERTWARN(SWITHIN(1, fromIndex, toIndex));
string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) +
(toIndex ? " AND id <= " + SQ(toIndex) : "")});
SDEBUG("Getting commits #" << fromIndex << "-" << toIndex);
query = "SELECT hash, query FROM (" + query + ") ORDER BY id";
return !SQuery(_db, "getting commits", query, result);
if (timeoutLimitUS) {
setTimeout(timeoutLimitUS);
}
int queryResult = SQuery(_db, "getting commits", query, result);
clearTimeout();
return queryResult;
}

int64_t SQLite::getLastInsertRowID() {
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class SQLite {
static bool getCommit(sqlite3* db, const vector<string> journalNames, uint64_t index, string& query, string& hash);

// Looks up a range of commits.
bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result);
int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0);

// Set a time limit for this transaction, in US from the current time.
void setTimeout(uint64_t timeLimitUS);
Expand Down
26 changes: 20 additions & 6 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing

// We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the
// maximum time limit that will cause this node to be disconnected from the cluster.
_queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2);
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand Down Expand Up @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
}
}

void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand Down Expand Up @@ -2132,12 +2135,23 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Figure out how much to send it
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
if (!sendAll)
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);
} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
if (!db.getCommits(fromIndex, toIndex, result))
STHROW("error getting commits");
if ((uint64_t)result.size() != toIndex - fromIndex + 1)
}
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
STHROW("synchronization query timeout");
} else {
STHROW("error getting commits");
}
}

if ((uint64_t)result.size() != toIndex - fromIndex + 1) {
STHROW("mismatched commit count");
}

// Wrap everything into one huge message
PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit);
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down

0 comments on commit 89de92f

Please sign in to comment.