Skip to content

Commit

Permalink
Merge pull request #1617 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
rafecolton authored Nov 14, 2023
2 parents 84ccced + 1facf1a commit d14c2e8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 28 deletions.
59 changes: 36 additions & 23 deletions plugins/Jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ void BedrockJobsCommand::process(SQLite& db) {
SWARN("Repeat is set in CreateJob, but is set to the empty string. Job Name: "
<< job["name"] << ", removing attribute.");
job.erase("repeat");
} else if (!_validateRepeat(job["repeat"])) {
} else if (!_validateRepeat(db, job["repeat"])) {
STHROW("402 Malformed repeat");
}
}
Expand Down Expand Up @@ -840,7 +840,7 @@ void BedrockJobsCommand::process(SQLite& db) {
}
string currentTime = SUNQUOTED_CURRENT_TIMESTAMP();
string retryAfterDateTime = "DATETIME(" + SQ(currentTime) + ", " + SQ(job["retryAfter"]) + ")";
string repeatDateTime = _constructNextRunDATETIME(job["nextRun"], currentTime, job["repeat"]);
string repeatDateTime = _constructNextRunDATETIME(db, job["nextRun"], currentTime, job["repeat"]);
string nextRunDateTime = repeatDateTime != "" ? "MIN(" + retryAfterDateTime + ", " + repeatDateTime + ")" : retryAfterDateTime;
bool isRepeatBasedOnScheduledTime = SToUpper(job["repeat"]).find("SCHEDULED") != string::npos;
string dataUpdateQuery = " ";
Expand Down Expand Up @@ -908,7 +908,7 @@ void BedrockJobsCommand::process(SQLite& db) {
if (request["repeat"].empty()) {
SWARN("Repeat is set in UpdateJob, but is set to the empty string. jobID: "
<< request["jobID"] << ".");
} else if (!_validateRepeat(request["repeat"])) {
} else if (!_validateRepeat(db, request["repeat"])) {
STHROW("402 Malformed repeat");
}
}
Expand Down Expand Up @@ -946,7 +946,7 @@ void BedrockJobsCommand::process(SQLite& db) {
// Passed next run takes priority over the one computed via the repeat feature
string newNextRun;
if (request["nextRun"].empty()) {
newNextRun = request["repeat"].size() ? _constructNextRunDATETIME(nextRun, lastRun, request["repeat"]) : "";
newNextRun = request["repeat"].size() ? _constructNextRunDATETIME(db, nextRun, lastRun, request["repeat"]) : "";
} else {
newNextRun = SQ(request["nextRun"]);
}
Expand Down Expand Up @@ -1134,7 +1134,7 @@ void BedrockJobsCommand::process(SQLite& db) {
if (!retryAfter.empty() && SToUpper(repeat).find("SCHEDULED") != string::npos) {
lastScheduled = originalDataNextRun;
}
safeNewNextRun = _constructNextRunDATETIME(lastScheduled, lastRun, repeat);
safeNewNextRun = _constructNextRunDATETIME(db, lastScheduled, lastRun, repeat);
} else if (SIEquals(requestVerb, "RetryJob")) {
const string& newNextRun = request["nextRun"];

Expand All @@ -1145,7 +1145,7 @@ void BedrockJobsCommand::process(SQLite& db) {
STHROW("402 Must specify a non-negative delay when retrying");
}
repeat = "FINISHED, +" + SToStr(delay) + " SECONDS";
safeNewNextRun = _constructNextRunDATETIME(nextRun, lastRun, repeat);
safeNewNextRun = _constructNextRunDATETIME(db, nextRun, lastRun, repeat);
if (safeNewNextRun.empty()) {
STHROW("402 Malformed delay");
}
Expand Down Expand Up @@ -1356,7 +1356,7 @@ void BedrockJobsCommand::process(SQLite& db) {
}
}

string BedrockJobsCommand::_constructNextRunDATETIME(const string& lastScheduled, const string& lastRun, const string& repeat) {
string BedrockJobsCommand::_constructNextRunDATETIME(SQLite& db, const string& lastScheduled, const string& lastRun, const string& repeat) {
if (repeat.empty()) {
return "";
}
Expand All @@ -1377,32 +1377,45 @@ string BedrockJobsCommand::_constructNextRunDATETIME(const string& lastScheduled
}

// Make sure the first part indicates the base (eg, what we are modifying)
list<string> safeParts;
string base = parts.front();
string nextRun = parts.front();
parts.pop_front();
if (base == "SCHEDULED") {
safeParts.push_back(SQ(lastScheduled));
} else if (base == "STARTED") {
safeParts.push_back(SQ(lastRun));
} else if (base == "FINISHED") {
safeParts.push_back(SCURRENT_TIMESTAMP());
if (nextRun == "SCHEDULED") {
nextRun = SQ(lastScheduled);
} else if (nextRun == "STARTED") {
nextRun = SQ(lastRun);
} else if (nextRun == "FINISHED") {
nextRun = SCURRENT_TIMESTAMP();
} else {
SWARN("Syntax error, failed parsing repeat '" << repeat << "': missing base (" << base << ")");
SWARN("Syntax error, failed parsing repeat '" << repeat << "': missing base (" << nextRun << ")");
return "";
}

for (const string& part : parts) {
// Validate the sqlite date modifiers
if (!SIsValidSQLiteDateModifier(part)){
SWARN("Syntax error, failed parsing repeat "+part);
// This isn't supported natively by SQLite, so do it manually here instead.
if (SToUpper(part) == "START OF HOUR") {
SQResult result;
if (!db.read("SELECT STRFTIME('%Y-%m-%d %H:00:00', " + nextRun + ");", result) || result.empty()) {
SWARN("Syntax error, failed parsing repeat " + part);
return "";
}

nextRun = SQ(result[0][0]);
} else if (!SIsValidSQLiteDateModifier(part)){
// Validate the sqlite date modifiers
SWARN("Syntax error, failed parsing repeat " + part);
return "";
}
} else {
SQResult result;
if (!db.read("SELECT DATETIME(" + nextRun + ", " + SQ(part) + ");", result) || result.empty()) {
SWARN("Syntax error, failed parsing repeat " + part);
return "";
}

safeParts.push_back(SQ(part));
nextRun = SQ(result[0][0]);
}
}

// Combine the parts together and return the full DATETIME statement
return "DATETIME( " + SComposeList(safeParts) + " )";
return "DATETIME(" + nextRun + ")";
}

// ==========================================================================
Expand Down
4 changes: 2 additions & 2 deletions plugins/Jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class BedrockJobsCommand : public BedrockCommand {

private:
// Helper functions
string _constructNextRunDATETIME(const string& lastScheduled, const string& lastRun, const string& repeat);
bool _validateRepeat(const string& repeat) { return !_constructNextRunDATETIME("", "", repeat).empty(); }
string _constructNextRunDATETIME(SQLite& db, const string& lastScheduled, const string& lastRun, const string& repeat);
bool _validateRepeat(SQLite& db, const string& repeat) { return !_constructNextRunDATETIME(db, "", "", repeat).empty(); }
bool _hasPendingChildJobs(SQLite& db, int64_t jobID);
void _validatePriority(const int64_t priority);

Expand Down
4 changes: 2 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,13 @@ bool SQLite::beginTransaction(TRANSACTION_TYPE type) {
return _insideTransaction;
}

bool SQLite::verifyTable(const string& tableName, const string& sql, bool& created) {
bool SQLite::verifyTable(const string& tableName, const string& sql, bool& created, const string& type) {
// sqlite trims semicolon, so let's not supply it else we get confused later
SASSERT(!SEndsWith(sql, ";"));

// First, see if it's there
SQResult result;
SASSERT(read("SELECT sql FROM sqlite_master WHERE type='table' AND tbl_name=" + SQ(tableName) + ";", result));
SASSERT(read("SELECT sql FROM sqlite_master WHERE type=" + SQ(type) + " AND tbl_name=" + SQ(tableName) + ";", result));
const string& collapsedSQL = SCollapse(sql);
if (result.empty()) {
// Table doesn't already exist, create it
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SQLite {
// Verifies a table exists and has a particular definition. If the database is left with the right schema, it
// returns true. If it had to create a new table (ie, the table was missing), it also sets created to true. If the
// table is already there with the wrong schema, it returns false.
bool verifyTable(const string& name, const string& sql, bool& created);
bool verifyTable(const string& name, const string& sql, bool& created, const string& type = "table");

// Verifies an index exists on the given table with the given definition. Optionally create it if it doesn't exist.
// Be careful, creating an index can be expensive on large tables!
Expand Down
64 changes: 64 additions & 0 deletions test/tests/jobs/RetryJobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ struct RetryJobTest : tpunit::TestFixture {
TEST(RetryJobTest::positiveDelay),
TEST(RetryJobTest::delayError),
TEST(RetryJobTest::hasRepeat),
TEST(RetryJobTest::hasRepeatStartOfHour),
TEST(RetryJobTest::hasRepeatStartOfHourNotLast),
TEST(RetryJobTest::inRunqueuedState),
TEST(RetryJobTest::simplyRetryWithNextRun),
TEST(RetryJobTest::changeNameAndPriority),
Expand Down Expand Up @@ -334,6 +336,68 @@ struct RetryJobTest : tpunit::TestFixture {
ASSERT_EQUAL(difftime(nextRunTime, createdTime), 3600);
}

// Retry a job with a repeat that uses the custom "START OF HOUR" modifier
void hasRepeatStartOfHour() {
uint64_t now = STimeNow();

// Create the job
SData command("CreateJob");
command["name"] = "job";
command["repeat"] = "SCHEDULED, START OF DAY, +5 MINUTES, START OF HOUR";
command["firstRun"] = SComposeTime("%Y-%m-%d %H:%M:%S", now);
STable response = tester->executeWaitVerifyContentTable(command);
string jobID = response["jobID"];

// Get the job
command.clear();
command.methodLine = "GetJob";
command["name"] = "job";
tester->executeWaitVerifyContent(command);

// Retry it
command.clear();
command.methodLine = "RetryJob";
command["jobID"] = jobID;
tester->executeWaitVerifyContent(command);

// Confirm nextRun is at the beginning of the day and not 5 minutes after
SQResult result;
tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result);
ASSERT_EQUAL(result.size(), 1);
ASSERT_EQUAL(result[0][0], SComposeTime("%Y-%m-%d 00:00:00", now));
}

// Same as hasRepeatStartOfHour but "START OF HOUR" is not last, to confirm it works when not last.
void hasRepeatStartOfHourNotLast() {
uint64_t now = STimeNow();

// Create the job
SData command("CreateJob");
command["name"] = "job";
command["repeat"] = "SCHEDULED, START OF DAY, +30 MINUTES, START OF HOUR, +5 MINUTES";
command["firstRun"] = SComposeTime("%Y-%m-%d %H:%M:%S", now);
STable response = tester->executeWaitVerifyContentTable(command);
string jobID = response["jobID"];

// Get the job
command.clear();
command.methodLine = "GetJob";
command["name"] = "job";
tester->executeWaitVerifyContent(command);

// Retry it
command.clear();
command.methodLine = "RetryJob";
command["jobID"] = jobID;
tester->executeWaitVerifyContent(command);

// Confirm nextRun is at the beginning of the day and not 5 minutes after
SQResult result;
tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result);
ASSERT_EQUAL(result.size(), 1);
ASSERT_EQUAL(result[0][0], SComposeTime("%Y-%m-%d 00:05:00", now));
}

// Retry job in RUNQUEUED state
void inRunqueuedState() {
// Create a job
Expand Down

0 comments on commit d14c2e8

Please sign in to comment.