Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to cr-sqlite 0.16.0 + a few performance improvements #75

Merged
merged 12 commits into from
Sep 29, 2023
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Upgraded to cr-sqlite 0.16.0 (unreleased) ([#75](../../pull/75))
- Rewrite compaction logic to be more correct and efficient ([#74](../../pull/74))
- `corrosion consul sync` will now bundle services and checks in a single transaction (changeset) ([#73](../../pull/73))
- (**BREAKING**) Persist subscriptions across reboots, including many reliability improvements ([#69](../../pull/69))
- Support existing tables being added to the schema ([#64](../../pull/64))
Expand Down
207 changes: 170 additions & 37 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300);
const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300);

pub struct AgentOptions {
actor_id: ActorId,
gossip_server_endpoint: quinn::Endpoint,
transport: Transport,
api_listener: TcpListener,
rx_bcast: Receiver<BroadcastInput>,
rx_apply: Receiver<(ActorId, i64)>,
rtt_rx: Receiver<(SocketAddr, Duration)>,
tripwire: Tripwire,
pub actor_id: ActorId,
pub gossip_server_endpoint: quinn::Endpoint,
pub transport: Transport,
pub api_listener: TcpListener,
pub rx_bcast: Receiver<BroadcastInput>,
pub rx_apply: Receiver<(ActorId, i64)>,
pub rx_empty: Receiver<(ActorId, RangeInclusive<i64>)>,
pub rtt_rx: Receiver<(SocketAddr, Duration)>,
pub tripwire: Tripwire,
}

pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, AgentOptions)> {
Expand Down Expand Up @@ -261,13 +262,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let (tx_bcast, rx_bcast) = channel(10240);

let (tx_empty, rx_empty) = channel(10240);

let opts = AgentOptions {
actor_id,
gossip_server_endpoint,
transport,
api_listener,
rx_bcast,
rx_apply,
rx_empty,
rtt_rx,
tripwire: tripwire.clone(),
};
Expand All @@ -283,6 +287,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
bookie,
tx_bcast,
tx_apply,
tx_empty,
schema: RwLock::new(schema),
tripwire,
});
Expand All @@ -307,6 +312,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
mut tripwire,
rx_bcast,
rx_apply,
rx_empty,
rtt_rx,
} = opts;

Expand Down Expand Up @@ -893,8 +899,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
);

spawn_counted(
sync_loop(agent.clone(), transport.clone(), rx_apply, tripwire.clone())
.inspect(|_| info!("corrosion agent sync loop is done")),
sync_loop(
agent.clone(),
transport.clone(),
rx_apply,
rx_empty,
tripwire.clone(),
)
.inspect(|_| info!("corrosion agent sync loop is done")),
);

let mut db_cleanup_interval = tokio::time::interval(Duration::from_secs(60 * 15));
Expand Down Expand Up @@ -987,7 +999,15 @@ async fn clear_overwritten_versions(agent: Agent) {
{
let booked = bookie.read().await;
for (actor_id, booked) in booked.iter() {
let versions = booked.read().await.current_versions();
let versions = {
match timeout(Duration::from_secs(1), booked.read()).await {
Ok(booked) => booked.current_versions(),
Err(_) => {
info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now");
continue;
}
}
};
if versions.is_empty() {
continue;
}
Expand Down Expand Up @@ -1081,7 +1101,7 @@ async fn clear_overwritten_versions(agent: Agent) {
db_version = NULL,
last_seq = NULL,
ts = NULL
WHERE end_version != excluded.end_version
WHERE end_version < excluded.end_version
",
)?
.execute(params![actor_id, range.start(), range.end()])?;
Expand Down Expand Up @@ -1271,12 +1291,16 @@ fn find_cleared_db_versions(tx: &Transaction) -> rusqlite::Result<BTreeSet<i64>>
.query_map([], |row| row.get(0))?
.collect::<Result<BTreeSet<String>, _>>()?;

if tables.is_empty() {
return Ok(BTreeSet::new());
}

let to_clear_query = format!(
"SELECT DISTINCT(db_version) FROM __corro_bookkeeping WHERE db_version IS NOT NULL
EXCEPT SELECT db_version FROM ({});",
tables
.iter()
.map(|table| format!("SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}"))
.map(|table| format!("SELECT DISTINCT(db_version) FROM {table}"))
.collect::<Vec<_>>()
.join(" UNION ")
);
Expand Down Expand Up @@ -1585,7 +1609,12 @@ fn store_empty_changeset(
"
INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (actor_id, start_version) DO NOTHING;
ON CONFLICT (actor_id, start_version) DO UPDATE SET
end_version = excluded.end_version,
db_version = NULL,
last_seq = NULL,
ts = NULL
WHERE end_version < excluded.end_version;
",
)?
.execute(params![
Expand Down Expand Up @@ -1830,17 +1859,36 @@ pub async fn process_multiple_changes(
continue;
}

let tx = conn.transaction()?;

let (known, changeset) = match process_single_version(&tx, change) {
Ok(res) => res,
Err(e) => {
error!(%actor_id, ?versions, "could not process single change: {e}");
continue;
// optimizing this, insert later!
let (known, changeset) = if change.is_complete() && change.is_empty() {
if let Err(e) = agent
.tx_empty()
.blocking_send((actor_id, change.versions()))
{
error!("could not send empty changed versions into channel: {e}");
}
};
// insert into in-memory bookkeeping right away
booked_write.insert_many(change.versions(), KnownDbVersion::Cleared);
(
KnownDbVersion::Cleared,
Changeset::Empty {
versions: change.versions(),
},
)
} else {
let tx = conn.transaction()?;

tx.commit()?;
let (known, changeset) = match process_single_version(&tx, change) {
Ok(res) => res,
Err(e) => {
error!(%actor_id, ?versions, "could not process single change: {e}");
continue;
}
};

tx.commit()?;
(known, changeset)
};

seen.insert(versions.clone(), known.clone());

Expand Down Expand Up @@ -1976,23 +2024,15 @@ fn process_complete_version(
tx: &Transaction,
actor_id: ActorId,
versions: RangeInclusive<i64>,
parts: Option<ChangesetParts>,
parts: ChangesetParts,
) -> rusqlite::Result<(KnownDbVersion, Changeset)> {
let ChangesetParts {
version,
changes,
seqs,
last_seq,
ts,
} = match parts {
None => {
store_empty_changeset(tx, actor_id, versions.clone())?;
info!(%actor_id, ?versions, "cleared empty versions range");
// booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared);
return Ok((KnownDbVersion::Cleared, Changeset::Empty { versions }));
}
Some(parts) => parts,
};
} = parts;

info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}");

Expand Down Expand Up @@ -2099,7 +2139,14 @@ fn process_single_version(
let versions = changeset.versions();

let (known, changeset) = if changeset.is_complete() {
process_complete_version(tx, actor_id, versions, changeset.into_parts())?
process_complete_version(
tx,
actor_id,
versions,
changeset
.into_parts()
.expect("no changeset parts, this shouldn't be happening!"),
)?
} else {
let parts = changeset.into_parts().unwrap();
let known = process_incomplete_version(tx, actor_id, &parts)?;
Expand Down Expand Up @@ -2285,10 +2332,13 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli
Ok(())
}

const CHECK_EMPTIES_TO_INSERT_AFTER: Duration = Duration::from_secs(120);

async fn sync_loop(
agent: Agent,
transport: Transport,
mut rx_apply: Receiver<(ActorId, i64)>,
mut rx_empty: Receiver<(ActorId, RangeInclusive<i64>)>,
mut tripwire: Tripwire,
) {
let mut sync_backoff = backoff::Backoff::new(0)
Expand All @@ -2297,6 +2347,62 @@ async fn sync_loop(
let next_sync_at = tokio::time::sleep(sync_backoff.next().unwrap());
tokio::pin!(next_sync_at);

spawn_counted({
let mut tripwire = tripwire.clone();
let agent = agent.clone();
async move {
let mut inserted_empties = 0;
let mut empties: BTreeMap<ActorId, Vec<RangeInclusive<i64>>> = BTreeMap::new();

let next_empties_check = tokio::time::sleep(CHECK_EMPTIES_TO_INSERT_AFTER);
tokio::pin!(next_empties_check);

loop {
tokio::select! {
maybe_empty = rx_empty.recv() => match maybe_empty {
Some((actor_id, versions)) => {
empties.entry(actor_id).or_default().push(versions);
inserted_empties += 1;

if inserted_empties < 1000 {
continue;
}
},
None => {
debug!("empties queue is done");
break;
}
},
_ = &mut next_empties_check => {
next_empties_check.as_mut().reset(tokio::time::Instant::now() + CHECK_EMPTIES_TO_INSERT_AFTER);
if empties.is_empty() {
continue;
}
},
_ = &mut tripwire => break
}

inserted_empties = 0;

if let Err(e) = process_completed_empties(&agent, &mut empties).await {
error!("could not process empties: {e}");
}
}
info!("Draining empty versions to process...");
// drain empties channel
while let Ok((actor_id, versions)) = rx_empty.try_recv() {
empties.entry(actor_id).or_default().push(versions);
}

if !empties.is_empty() {
info!("inserting last unprocessed empties before shut down");
if let Err(e) = process_completed_empties(&agent, &mut empties).await {
error!("could not process empties: {e}");
}
}
}
});

loop {
enum Branch {
Tick,
Expand Down Expand Up @@ -2362,6 +2468,33 @@ async fn sync_loop(
}
}

async fn process_completed_empties(
agent: &Agent,
empties: &mut BTreeMap<ActorId, Vec<RangeInclusive<i64>>>,
) -> eyre::Result<()> {
let mut conn = agent.pool().write_normal().await?;

block_in_place(|| {
let tx = conn.transaction()?;
while let Some((actor_id, empties)) = empties.pop_first() {
let booked = agent.bookie().for_actor_blocking(actor_id);
let bookedw = booked.blocking_write();

for (range, _) in empties
.iter()
.filter_map(|range| bookedw.get_key_value(range.start()))
.dedup()
{
store_empty_changeset(&tx, actor_id, range.clone())?;
}
}

tx.commit()?;

Ok(())
})
}

pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> {
let migrations: Vec<Box<dyn Migration>> = vec![
Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>),
Expand Down Expand Up @@ -2916,10 +3049,10 @@ pub mod tests {

conn.execute_batch(
"
CREATE TABLE foo (a INTEGER PRIMARY KEY, b INTEGER);
CREATE TABLE foo (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
SELECT crsql_as_crr('foo');

CREATE TABLE foo2 (a INTEGER PRIMARY KEY, b INTEGER);
CREATE TABLE foo2 (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
SELECT crsql_as_crr('foo2');
",
)?;
Expand Down Expand Up @@ -2950,7 +3083,7 @@ pub mod tests {
}

{
let mut prepped = conn.prepare("SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo2__crsql_clock UNION SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo__crsql_clock;")?;
let mut prepped = conn.prepare("SELECT DISTINCT(db_version) FROM foo2__crsql_clock UNION SELECT DISTINCT(db_version) FROM foo__crsql_clock;")?;
let mut rows = prepped.query([])?;

while let Ok(Some(row)) = rows.next() {
Expand Down
Loading