Skip to content

Commit

Permalink
Fix bug in fivetran streaming export from empty deployment (#28496)
Browse files Browse the repository at this point in the history
Needed to grab the snapshot from the list_snapshot result even
if the first page is empty.

Added test and confirmed that it covers this case.

GitOrigin-RevId: ed66bdc95a695df784940b88b4bbbb54e7a78975
  • Loading branch information
nipunn1313 authored and Convex, Inc. committed Jul 30, 2024
1 parent ac3b44b commit fc1ddd9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
12 changes: 5 additions & 7 deletions crates/fivetran_source/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ async fn initial_sync(
log(&log_msg);
yield UpdateMessage::Log(LogLevel::Info, log_msg);

let mut has_more = true;

while has_more {
let snapshot = loop {
let snapshot = checkpoint.as_ref().map(|c| c.0);
let cursor = checkpoint.as_ref().map(|c| c.1.clone());
let res = source.list_snapshot(snapshot, cursor.clone(), None).await?;
Expand All @@ -213,8 +211,7 @@ async fn initial_sync(
};
}

has_more = res.has_more;
if has_more {
if res.has_more {
let cursor = ListSnapshotCursor::from(
res.cursor.context("Missing cursor when has_more was set")?,
);
Expand All @@ -226,10 +223,11 @@ async fn initial_sync(
tables_seen.clone(),
));
checkpoint = Some((res.snapshot, cursor));
} else {
break res.snapshot;
}
}
};

let (snapshot, _) = checkpoint.context("list_snapshot lacking a snapshot for checkpoint")?;
let cursor = DocumentDeltasCursor::from(snapshot);
yield UpdateMessage::Checkpoint(State::create(
Checkpoint::DeltaUpdates { cursor },
Expand Down
28 changes: 27 additions & 1 deletion crates/fivetran_source/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
vec,
};

use anyhow::Ok;
use anyhow::{
Context,
Ok,
};
use async_trait::async_trait;
use convex_fivetran_common::fivetran_sdk::{
value_type,
Expand Down Expand Up @@ -39,6 +42,7 @@ use crate::{
},
sync::{
sync,
Checkpoint,
State,
UpdateMessage,
},
Expand Down Expand Up @@ -375,6 +379,28 @@ async fn initial_sync_copies_documents_from_source_to_destination() -> anyhow::R
Ok(())
}

#[tokio::test]
async fn initial_sync_empty_source_works() -> anyhow::Result<()> {
let source = FakeSource::default();
let mut destination = FakeDestination::default();

assert_eq!(source.tables.len(), 0);
destination
.receive(sync(source.clone(), destination.latest_state()))
.await?;
assert!(destination.has_log("Initial sync successful"));
assert_eq!(destination.checkpointed_data.tables.len(), 0);
let state = destination.latest_state().context("missing state")?;
assert!(matches!(
state.checkpoint,
Checkpoint::DeltaUpdates {
cursor: DocumentDeltasCursor(0)
}
));

Ok(())
}

/// Verifies that the source and the destination are in sync by starting a new
/// initial sync and verifying that the destinations match.
async fn assert_in_sync(source: impl Source + 'static, destination: &FakeDestination) {
Expand Down

0 comments on commit fc1ddd9

Please sign in to comment.