Skip to content

Commit

Permalink
Modify the graph interface to make possible select the type of operat…
Browse files Browse the repository at this point in the history
…ion to perform (read or write)
  • Loading branch information
madchicken committed Dec 19, 2024
1 parent 79722b5 commit 5eacd78
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 10 deletions.
70 changes: 60 additions & 10 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,19 @@ impl Graph {
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn(&self) -> Result<Txn> {
self.impl_start_txn_on(self.config.db.clone()).await
self.impl_start_txn_on(self.config.db.clone(), Operation::Write)
.await
}

/// Starts a READ ONLY new transaction on the configured database.
/// All queries that needs to be run/executed within the transaction
/// should be executed using either [`Txn::run`] or [`Txn::execute`]
///
/// Transactions will not be automatically retried on any failure.
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub async fn start_read_txn(&self) -> Result<Txn> {
self.impl_start_txn_on(self.config.db.clone(), Operation::Read)
.await
}

/// Starts a new transaction on the provided database.
Expand All @@ -143,11 +155,12 @@ impl Graph {
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn_on(&self, db: impl Into<Database>) -> Result<Txn> {
self.impl_start_txn_on(Some(db.into())).await
self.impl_start_txn_on(Some(db.into()), Operation::Write)
.await
}

async fn impl_start_txn_on(&self, db: Option<Database>) -> Result<Txn> {
let connection = self.pool.get(Some(Operation::Write)).await?;
async fn impl_start_txn_on(&self, db: Option<Database>, operation: Operation) -> Result<Txn> {
let connection = self.pool.get(Some(operation)).await?;
Txn::new(db, self.config.fetch_size, connection).await
}

Expand All @@ -163,6 +176,23 @@ impl Graph {
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Write)
.await
}

/// Runs a READ ONLY query on the configured database using a connection from the connection pool,
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
///
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub async fn run_read(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Read)
.await
}
Expand All @@ -178,8 +208,13 @@ impl Graph {
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run_on(&self, db: impl Into<Database>, q: Query) -> Result<()> {
self.impl_run_on(Some(db.into()), q, Operation::Read).await
pub async fn run_on(
&self,
db: impl Into<Database>,
q: Query,
operation: Operation,
) -> Result<()> {
self.impl_run_on(Some(db.into()), q, operation).await
}

async fn impl_run_on(
Expand All @@ -205,7 +240,7 @@ impl Graph {
.await
}

/// Executes a query on the configured database and returns a [`DetachedRowStream`]
/// Executes a READ/WRITE query on the configured database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
Expand All @@ -216,17 +251,32 @@ impl Graph {
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
/// Executes a query READ on the configured database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_on(&self, db: impl Into<Database>, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, Operation::Write)
pub async fn execute_read(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Read)
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: Query,
operation: Operation,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, operation).await
}

async fn impl_execute_on(
&self,
db: Option<Database>,
Expand Down
1 change: 1 addition & 0 deletions lib/src/routing/routed_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl RoutedConnectionManager {
.load_balancing_strategy
.select_reader(available_servers.as_slice()),
} {
debug!("requesting connection for server: {:?}", server);
if let Some(pool) = self.registry.get_pool(&server) {
match pool.get().await {
Ok(connection) => return Ok(connection),
Expand Down

0 comments on commit 5eacd78

Please sign in to comment.