diff --git a/lib/src/graph.rs b/lib/src/graph.rs index e324683..d0aef96 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -134,7 +134,19 @@ impl Graph { /// /// Transactions will not be automatically retried on any failure. pub async fn start_txn(&self) -> Result { - self.impl_start_txn_on(self.config.db.clone()).await + self.impl_start_txn_on(self.config.db.clone(), Operation::Write) + .await + } + + /// Starts a READ ONLY new transaction on the configured database. + /// All queries that needs to be run/executed within the transaction + /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn start_read_txn(&self) -> Result { + self.impl_start_txn_on(self.config.db.clone(), Operation::Read) + .await } /// Starts a new transaction on the provided database. @@ -143,11 +155,12 @@ impl Graph { /// /// Transactions will not be automatically retried on any failure. pub async fn start_txn_on(&self, db: impl Into) -> Result { - self.impl_start_txn_on(Some(db.into())).await + self.impl_start_txn_on(Some(db.into()), Operation::Write) + .await } - async fn impl_start_txn_on(&self, db: Option) -> Result { - let connection = self.pool.get(Some(Operation::Write)).await?; + async fn impl_start_txn_on(&self, db: Option, operation: Operation) -> Result { + let connection = self.pool.get(Some(operation)).await?; Txn::new(db, self.config.fetch_size, connection).await } @@ -163,6 +176,23 @@ impl Graph { /// /// use [`Graph::execute`] when you are interested in the result stream pub async fn run(&self, q: Query) -> Result<()> { + self.impl_run_on(self.config.db.clone(), q, Operation::Write) + .await + } + + /// Runs a READ ONLY query on the configured database using a connection from the connection pool, + /// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream. + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + /// + /// Use [`Graph::run`] for cases where you just want a write operation + /// + /// use [`Graph::execute`] when you are interested in the result stream + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn run_read(&self, q: Query) -> Result<()> { self.impl_run_on(self.config.db.clone(), q, Operation::Read) .await } @@ -178,8 +208,13 @@ impl Graph { /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream - pub async fn run_on(&self, db: impl Into, q: Query) -> Result<()> { - self.impl_run_on(Some(db.into()), q, Operation::Read).await + pub async fn run_on( + &self, + db: impl Into, + q: Query, + operation: Operation, + ) -> Result<()> { + self.impl_run_on(Some(db.into()), q, operation).await } async fn impl_run_on( @@ -205,7 +240,7 @@ impl Graph { .await } - /// Executes a query on the configured database and returns a [`DetachedRowStream`] + /// Executes a READ/WRITE query on the configured database and returns a [`DetachedRowStream`] /// /// This operation retires the query on certain failures. /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. @@ -216,17 +251,32 @@ impl Graph { .await } - /// Executes a query on the provided database and returns a [`DetachedRowStream`] + /// Executes a query READ on the configured database and returns a [`DetachedRowStream`] /// /// This operation retires the query on certain failures. /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. - pub async fn execute_on(&self, db: impl Into, q: Query) -> Result { - self.impl_execute_on(Some(db.into()), q, Operation::Write) + pub async fn execute_read(&self, q: Query) -> Result { + self.impl_execute_on(self.config.db.clone(), q, Operation::Read) .await } + /// Executes a query on the provided database and returns a [`DetachedRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + pub async fn execute_on( + &self, + db: impl Into, + q: Query, + operation: Operation, + ) -> Result { + self.impl_execute_on(Some(db.into()), q, operation).await + } + async fn impl_execute_on( &self, db: Option, diff --git a/lib/src/routing/routed_connection_manager.rs b/lib/src/routing/routed_connection_manager.rs index b157174..6e6d4a6 100644 --- a/lib/src/routing/routed_connection_manager.rs +++ b/lib/src/routing/routed_connection_manager.rs @@ -117,6 +117,7 @@ impl RoutedConnectionManager { .load_balancing_strategy .select_reader(available_servers.as_slice()), } { + debug!("requesting connection for server: {:?}", server); if let Some(pool) = self.registry.get_pool(&server) { match pool.get().await { Ok(connection) => return Ok(connection),