diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index 39b998706..3d3035e08 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -15,8 +15,8 @@ use crate::cluster::node::CloudEndpoint; use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef}; use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState}; use crate::errors::{ - BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, RequestError, - TracingProtocolError, + BadQuery, MetadataError, NewSessionError, ProtocolError, QueryError, RequestAttemptError, + RequestError, TracingProtocolError, }; use crate::frame::response::result; #[cfg(feature = "ssl")] @@ -1717,7 +1717,7 @@ where /// /// Normally this is not needed, /// the driver should automatically detect all metadata changes in the cluster - pub async fn refresh_metadata(&self) -> Result<(), QueryError> { + pub async fn refresh_metadata(&self) -> Result<(), MetadataError> { self.cluster.refresh_metadata().await } diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index db9845acb..2ac5ec0fe 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -19,7 +19,9 @@ use crate::client::pager::{NextPageError, NextRowError, QueryPager}; use crate::cluster::node::resolve_contact_points; use crate::deserialize::DeserializeOwnedRow; -use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError}; +use crate::errors::{ + DbError, MetadataFetchError, MetadataFetchErrorKind, NewSessionError, RequestAttemptError, +}; use crate::frame::response::event::Event; use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize}; use crate::policies::host_filter::HostFilter; @@ -33,7 +35,6 @@ use futures::stream::{self, StreamExt, TryStreamExt}; use futures::Stream; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; -use scylla_cql::deserialize::TypeCheckError; use scylla_macros::DeserializeRow; use std::borrow::BorrowMut; use std::cell::Cell; @@ -51,8 +52,8 @@ use uuid::Uuid; use crate::cluster::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint}; use crate::errors::{ - KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError, - ProtocolError, RequestError, TablesMetadataError, UdtMetadataError, ViewsMetadataError, + KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError, RequestError, + TablesMetadataError, UdtMetadataError, }; // Re-export of CQL types. @@ -360,17 +361,6 @@ impl fmt::Display for InvalidCqlType { } } -impl From for QueryError { - fn from(e: InvalidCqlType) -> Self { - ProtocolError::InvalidCqlType { - typ: e.typ, - position: e.position, - reason: e.reason, - } - .into() - } -} - impl Metadata { /// Creates new, dummy metadata from a given list of peers. /// @@ -461,7 +451,7 @@ impl MetadataReader { } /// Fetches current metadata from the cluster - pub(crate) async fn read_metadata(&mut self, initial: bool) -> Result { + pub(crate) async fn read_metadata(&mut self, initial: bool) -> Result { let mut result = self.fetch_metadata(initial).await; let prev_err = match result { Ok(metadata) => { @@ -539,8 +529,8 @@ impl MetadataReader { &mut self, initial: bool, nodes: impl Iterator, - prev_err: QueryError, - ) -> Result { + prev_err: MetadataError, + ) -> Result { let mut result = Err(prev_err); for peer in nodes { let err = match result { @@ -573,7 +563,7 @@ impl MetadataReader { result } - async fn fetch_metadata(&self, initial: bool) -> Result { + async fn fetch_metadata(&self, initial: bool) -> Result { // TODO: Timeouts? self.control_connection.wait_until_initialized().await; let conn = &self.control_connection.random_connection()?; @@ -693,7 +683,7 @@ async fn query_metadata( connect_port: u16, keyspace_to_fetch: &[String], fetch_schema: bool, -) -> Result { +) -> Result { let peers_query = query_peers(conn, connect_port); let keyspaces_query = query_keyspaces(conn, keyspace_to_fetch, fetch_schema); @@ -701,12 +691,12 @@ async fn query_metadata( // There must be at least one peer if peers.is_empty() { - return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers).into()); + return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers)); } // At least one peer has to have some tokens if peers.iter().all(|peer| peer.tokens.is_empty()) { - return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); + return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists)); } Ok(Metadata { peers, keyspaces }) @@ -741,7 +731,10 @@ impl NodeInfoSource { const METADATA_QUERY_PAGE_SIZE: i32 = 1024; -async fn query_peers(conn: &Arc, connect_port: u16) -> Result, QueryError> { +async fn query_peers( + conn: &Arc, + connect_port: u16, +) -> Result, MetadataError> { let mut peers_query = Query::new("select host_id, rpc_address, data_center, rack, tokens from system.peers"); peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE); @@ -750,14 +743,17 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result().map_err(|err| { - MetadataError::Peers(PeersMetadataError::SystemPeersInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(rows_stream) + let rows_stream = pager.rows_stream::()?; + Ok::<_, MetadataFetchErrorKind>(rows_stream) }) .into_stream() - .map(|result| result.map(|stream| stream.map_err(QueryError::from))) + .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError))) .try_flatten() + // Add table context to the error. + .map_err(|error| MetadataFetchError { + error, + table: "system.peers", + }) .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); let mut local_query = @@ -768,14 +764,17 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result().map_err(|err| { - MetadataError::Peers(PeersMetadataError::SystemLocalInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(rows_stream) + let rows_stream = pager.rows_stream::()?; + Ok::<_, MetadataFetchErrorKind>(rows_stream) }) .into_stream() - .map(|result| result.map(|stream| stream.map_err(QueryError::from))) + .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError))) .try_flatten() + // Add table context to the error. + .map_err(|error| MetadataFetchError { + error, + table: "system.local", + }) .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result))); let untranslated_rows = stream::select(peers_query_stream, local_query_stream); @@ -791,16 +790,16 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result>() - .await?; + .filter_map(std::future::ready) + .collect::>() + .await; Ok(peers) } @@ -808,7 +807,7 @@ async fn create_peer_from_row( source: NodeInfoSource, row: NodeInfoRow, local_address: SocketAddr, -) -> Result, QueryError> { +) -> Option { let NodeInfoRow { host_id, untranslated_ip_addr, @@ -821,7 +820,7 @@ async fn create_peer_from_row( Some(host_id) => host_id, None => { warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); - return Ok(None); + return None; } }; @@ -861,21 +860,20 @@ async fn create_peer_from_row( } }; - Ok(Some(Peer { + Some(Peer { host_id, address: node_addr, tokens, datacenter, rack, - })) + }) } fn query_filter_keyspace_name<'a, R>( conn: &Arc, query_str: &'a str, keyspaces_to_fetch: &'a [String], - convert_typecheck_error: impl FnOnce(TypeCheckError) -> MetadataError + 'a, -) -> impl Stream> + 'a +) -> impl Stream> + 'a where R: DeserializeOwnedRow + 'static, { @@ -888,12 +886,14 @@ where conn: Arc, query_str: &str, keyspaces_to_fetch: &[String], - ) -> Result { + ) -> Result { if keyspaces_to_fetch.is_empty() { let mut query = Query::new(query_str); query.set_page_size(METADATA_QUERY_PAGE_SIZE); - conn.query_iter(query).await.map_err(QueryError::from) + conn.query_iter(query) + .await + .map_err(MetadataFetchErrorKind::NextRowError) } else { let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; let query_str = format!("{query_str} where keyspace_name in ?"); @@ -901,25 +901,21 @@ where let mut query = Query::new(query_str); query.set_page_size(METADATA_QUERY_PAGE_SIZE); - let prepared = conn - .prepare(&query) - .await - .map_err(RequestAttemptError::into_query_error)?; + let prepared = conn.prepare(&query).await?; let serialized_values = prepared.serialize_values(&keyspaces)?; conn.execute_iter(prepared, serialized_values) .await - .map_err(QueryError::from) + .map_err(MetadataFetchErrorKind::NextRowError) } } let fut = async move { let pager = make_keyspace_filtered_query_pager(conn, query_str, keyspaces_to_fetch).await?; - let stream: crate::client::pager::TypedRowStream = - pager.rows_stream::().map_err(convert_typecheck_error)?; - Ok::<_, QueryError>(stream) + let stream: crate::client::pager::TypedRowStream = pager.rows_stream::()?; + Ok::<_, MetadataFetchErrorKind>(stream) }; fut.into_stream() - .map(|result| result.map(|stream| stream.map_err(QueryError::from))) + .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError))) .try_flatten() } @@ -927,17 +923,16 @@ async fn query_keyspaces( conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, -) -> Result, QueryError> { +) -> Result, MetadataError> { let rows = query_filter_keyspace_name::<(String, HashMap)>( conn, "select keyspace_name, replication from system_schema.keyspaces", keyspaces_to_fetch, - |err| { - MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType( - err, - )) - }, - ); + ) + .map_err(|error| MetadataFetchError { + error, + table: "system_schema.keyspaces", + }); let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema { let udts = query_user_defined_types(conn, keyspaces_to_fetch).await?; @@ -962,10 +957,10 @@ async fn query_keyspaces( let (keyspace_name, strategy_map) = row_result?; let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| { - MetadataError::Keyspaces(KeyspacesMetadataError::Strategy { + KeyspacesMetadataError::Strategy { keyspace: keyspace_name.clone(), error, - }) + } })?; let tables = all_tables .remove(&keyspace_name) @@ -978,7 +973,7 @@ async fn query_keyspaces( .unwrap_or_else(|| Ok(HashMap::new())); // As you can notice, in this file we generally operate on two layers of errors: - // - Outer (QueryError) if something went wrong with querying the cluster. + // - Outer (MetadataError) if something went wrong with querying the cluster. // - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata // turned out to not be fully consistent. // If there is an inner error, we want to drop metadata for the whole keyspace. @@ -1047,20 +1042,29 @@ async fn query_user_defined_types( keyspaces_to_fetch: &[String], ) -> Result< PerKeyspaceResult>>, MissingUserDefinedType>, - QueryError, + MetadataError, > { let rows = query_filter_keyspace_name::( conn, "select keyspace_name, type_name, field_names, field_types from system_schema.types", keyspaces_to_fetch, - |err| MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)), - ); + ) + .map_err(|error| MetadataFetchError { + error, + table: "system_schema.types", + }); let mut udt_rows: Vec = rows .map(|row_result| { - let udt_row = row_result?.try_into()?; + let udt_row = row_result?.try_into().map_err(|err: InvalidCqlType| { + MetadataError::Udts(UdtMetadataError::InvalidCqlType { + typ: err.typ, + position: err.position, + reason: err.reason, + }) + })?; - Ok::<_, QueryError>(udt_row) + Ok::<_, MetadataError>(udt_row) }) .try_collect() .await?; @@ -1118,7 +1122,7 @@ async fn query_user_defined_types( Ok(udts) } -fn topo_sort_udts(udts: &mut Vec) -> Result<(), QueryError> { +fn topo_sort_udts(udts: &mut Vec) -> Result<(), UdtMetadataError> { fn do_with_referenced_udts(what: &mut impl FnMut(&str), pre_cql_type: &PreColumnType) { match pre_cql_type { PreColumnType::Native(_) => (), @@ -1201,7 +1205,7 @@ fn topo_sort_udts(udts: &mut Vec) -> Result<(), Quer if sorted.len() < indegs.len() { // Some UDTs could not become leaves in the graph, which implies cycles. - return Err(MetadataError::Udts(UdtMetadataError::CircularTypeDependency).into()); + return Err(UdtMetadataError::CircularTypeDependency); } let owned_sorted = sorted.into_iter().cloned().collect::>(); @@ -1382,13 +1386,16 @@ async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, QueryError> { +) -> Result, MissingUserDefinedType>, MetadataError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", keyspaces_to_fetch, - |err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)), - ); + ) + .map_err(|error| MetadataFetchError { + error, + table: "system_schema.tables", + }); let mut result = HashMap::new(); rows.map(|row_result| { @@ -1412,7 +1419,7 @@ async fn query_tables( (Ok(_), Err(e)) => *entry = Err(e), }; - Ok::<_, QueryError>(()) + Ok::<_, MetadataError>(()) }) .try_for_each(|_| future::ok(())) .await?; @@ -1424,13 +1431,16 @@ async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, QueryError> { +) -> Result, MissingUserDefinedType>, MetadataError> { let rows = query_filter_keyspace_name::<(String, String, String)>( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", keyspaces_to_fetch, - |err| MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)), - ); + ) + .map_err(|error| MetadataFetchError { + error, + table: "system_schema.views", + }); let mut result = HashMap::new(); @@ -1464,7 +1474,7 @@ async fn query_views( (Ok(_), Err(e)) => *entry = Err(e), }; - Ok::<_, QueryError>(()) + Ok::<_, MetadataError>(()) }) .try_for_each(|_| future::ok(())) .await?; @@ -1476,7 +1486,7 @@ async fn query_tables_schema( conn: &Arc, keyspaces_to_fetch: &[String], udts: &PerKeyspaceResult>>, MissingUserDefinedType>, -) -> Result, QueryError> { +) -> Result, MetadataError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of // type EmptyType for dense tables. This resolves into this CQL type name. // This column shouldn't be exposed to the user but is currently exposed in system tables. @@ -1484,11 +1494,14 @@ async fn query_tables_schema( type RowType = (String, String, String, String, i32, String); - let rows = query_filter_keyspace_name::(conn, - "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch, |err| { - MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err)) - } - ); + let rows = query_filter_keyspace_name::( + conn, + "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", + keyspaces_to_fetch + ).map_err(|error| MetadataFetchError { + error, + table: "system_schema.columns", + }); let empty_ok_map = Ok(HashMap::new()); @@ -1498,7 +1511,7 @@ async fn query_tables_schema( let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; if type_ == THRIFT_EMPTY_TYPE { - return Ok::<_, QueryError>(()); + return Ok::<_, MetadataError>(()); } let keyspace_udts: &PerTable>> = @@ -1527,26 +1540,31 @@ async fn query_tables_schema( // solution 1: but the keyspace won't be present in the result at all, // which is arguably worse. tables_schema.insert((keyspace_name, table_name), Err(e.clone())); - return Ok::<_, QueryError>(()); + return Ok::<_, MetadataError>(()); } }; - let pre_cql_type = map_string_to_cql_type(&type_)?; + let pre_cql_type = map_string_to_cql_type(&type_).map_err(|err: InvalidCqlType| { + TablesMetadataError::InvalidCqlType { + typ: err.typ, + position: err.position, + reason: err.reason, + } + })?; let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) { Ok(t) => t, Err(e) => { tables_schema.insert((keyspace_name, table_name), Err(e)); - return Ok::<_, QueryError>(()); + return Ok::<_, MetadataError>(()); } }; - let kind = ColumnKind::from_str(&kind).map_err(|_| { - MetadataError::Tables(TablesMetadataError::UnknownColumnKind { + let kind = + ColumnKind::from_str(&kind).map_err(|_| TablesMetadataError::UnknownColumnKind { keyspace_name: keyspace_name.clone(), table_name: table_name.clone(), column_name: column_name.clone(), column_kind: kind, - }) - })?; + })?; let Ok(entry) = tables_schema .entry((keyspace_name, table_name)) @@ -1557,7 +1575,7 @@ async fn query_tables_schema( ))) else { // This table was previously marked as broken, no way to insert anything. - return Ok::<_, QueryError>(()); + return Ok::<_, MetadataError>(()); }; if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { @@ -1577,7 +1595,7 @@ async fn query_tables_schema( }, ); - Ok::<_, QueryError>(()) + Ok::<_, MetadataError>(()) }) .try_for_each(|_| future::ok(())) .await?; @@ -1779,7 +1797,14 @@ fn freeze_type(typ: PreColumnType) -> PreColumnType { async fn query_table_partitioners( conn: &Arc, -) -> Result>, QueryError> { +) -> Result>, MetadataFetchError> { + fn create_err(err: impl Into) -> MetadataFetchError { + MetadataFetchError { + error: err.into(), + table: "system_schema.scylla_tables", + } + } + let mut partitioner_query = Query::new( "select keyspace_name, table_name, partitioner from system_schema.scylla_tables", ); @@ -1789,22 +1814,22 @@ async fn query_table_partitioners( .clone() .query_iter(partitioner_query) .map(|pager_res| { - let pager = pager_res?; + let pager = pager_res.map_err(create_err)?; let stream = pager .rows_stream::<(String, String, Option)>() - .map_err(|err| { - MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(stream) + // Map the error of Result + .map_err(create_err)? + // Map the error of single stream iteration (NextRowError) + .map_err(create_err); + Ok::<_, MetadataFetchError>(stream) }) .into_stream() - .map(|result| result.map(|stream| stream.map_err(QueryError::from))) .try_flatten(); let result = rows .map(|row_result| { let (keyspace_name, table_name, partitioner) = row_result?; - Ok::<_, QueryError>(((keyspace_name, table_name), partitioner)) + Ok::<_, MetadataFetchError>(((keyspace_name, table_name), partitioner)) }) .try_collect::>() .await; @@ -1814,15 +1839,15 @@ async fn query_table_partitioners( // that we are only interested in the ones resulting from non-existent table // system_schema.scylla_tables. // For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262 - // FIXME 2: The specific error we expect here should appear in QueryError::NextRowError. Currently - // leaving match against both variants. This will be fixed, once `MetadataError` is further adjusted - // in a follow-up PR. The goal is to return MetadataError from all functions related to metadata fetch. - Err(QueryError::DbError(DbError::Invalid, _)) - | Err(QueryError::NextRowError(NextRowError::NextPageError( - NextPageError::RequestFailure(RequestError::LastAttemptError( - RequestAttemptError::DbError(DbError::Invalid, _), - )), - ))) => Ok(HashMap::new()), + Err(MetadataFetchError { + error: + MetadataFetchErrorKind::NextRowError(NextRowError::NextPageError( + NextPageError::RequestFailure(RequestError::LastAttemptError( + RequestAttemptError::DbError(DbError::Invalid, _), + )), + )), + .. + }) => Ok(HashMap::new()), result => result, } } diff --git a/scylla/src/cluster/worker.rs b/scylla/src/cluster/worker.rs index 3dab166fc..133b81f44 100644 --- a/scylla/src/cluster/worker.rs +++ b/scylla/src/cluster/worker.rs @@ -1,5 +1,5 @@ use crate::client::session::TABLET_CHANNEL_SIZE; -use crate::errors::{NewSessionError, QueryError}; +use crate::errors::{MetadataError, NewSessionError, QueryError}; use crate::frame::response::event::{Event, StatusChangeEvent}; use crate::network::{PoolConfig, VerifiedKeyspaceName}; use crate::policies::host_filter::HostFilter; @@ -95,7 +95,7 @@ struct ClusterWorker { #[derive(Debug)] struct RefreshRequest { - response_chan: tokio::sync::oneshot::Sender>, + response_chan: tokio::sync::oneshot::Sender>, } #[derive(Debug)] @@ -182,7 +182,7 @@ impl Cluster { self.data.load_full() } - pub(crate) async fn refresh_metadata(&self) -> Result<(), QueryError> { + pub(crate) async fn refresh_metadata(&self) -> Result<(), MetadataError> { let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); self.refresh_channel @@ -401,7 +401,7 @@ impl ClusterWorker { use_keyspace_result(use_keyspace_results.into_iter()) } - async fn perform_refresh(&mut self) -> Result<(), QueryError> { + async fn perform_refresh(&mut self) -> Result<(), MetadataError> { // Read latest Metadata let metadata = self.metadata_reader.read_metadata(false).await?; let cluster_data: Arc = self.cluster_data.load_full(); diff --git a/scylla/src/errors.rs b/scylla/src/errors.rs index b25c07f1c..479682fa5 100644 --- a/scylla/src/errors.rs +++ b/scylla/src/errors.rs @@ -328,14 +328,6 @@ pub enum ProtocolError { #[error("Unpaged query returned a non-empty paging state! This is a driver-side or server-side bug.")] NonfinishedPagingState, - /// Failed to parse CQL type. - #[error("Failed to parse a CQL type '{typ}', at position {position}: {reason}")] - InvalidCqlType { - typ: String, - position: usize, - reason: String, - }, - /// Unable extract a partition key based on prepared statement's metadata. #[error("Unable extract a partition key based on prepared statement's metadata")] PartitionKeyExtraction, @@ -414,17 +406,30 @@ pub enum TracingProtocolError { EmptyResults, } -/// An error that occurred during cluster metadata fetch. +/// An error that occurred during metadata fetch and verification. /// -/// An error can occur during metadata fetch of: -/// - peers +/// The driver performs metadata fetch and verification of the cluster's schema +/// and topology. This includes: /// - keyspaces /// - UDTs /// - tables /// - views +/// - peers (topology) +/// +/// The errors that occur during metadata fetch are contained in [`MetadataFetchError`]. +/// Remaining errors (logical errors) are contained in the variants corresponding to the +/// specific part of the metadata. #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum MetadataError { + /// Control connection pool error. + #[error("Control connection pool error: {0}")] + ConnectionPoolError(#[from] ConnectionPoolError), + + /// Failed to fetch metadata. + #[error("transparent")] + FetchError(#[from] MetadataFetchError), + /// Bad peers metadata. #[error("Bad peers metadata: {0}")] Peers(#[from] PeersMetadataError), @@ -440,24 +445,44 @@ pub enum MetadataError { /// Bad tables metadata. #[error("Bad tables metadata: {0}")] Tables(#[from] TablesMetadataError), +} - /// Bad views metadata. - #[error("Bad views metadata: {0}")] - Views(#[from] ViewsMetadataError), +/// An error occurred during metadata fetch. +#[derive(Error, Debug, Clone)] +#[error("Metadata fetch failed for table \"{table}\": {error}")] +#[non_exhaustive] +pub struct MetadataFetchError { + /// Reason why metadata fetch failed. + pub error: MetadataFetchErrorKind, + /// Table name for which metadata fetch failed. + pub table: &'static str, } -/// An error that occurred during peers metadata fetch. +/// Specific reason why metadata fetch failed. #[derive(Error, Debug, Clone)] #[non_exhaustive] -pub enum PeersMetadataError { - /// system.peers has invalid column type. - #[error("system.peers has invalid column type: {0}")] - SystemPeersInvalidColumnType(TypeCheckError), +pub enum MetadataFetchErrorKind { + /// Queried table has invalid column type. + #[error("The table has invalid column type: {0}")] + InvalidColumnType(#[from] TypeCheckError), + + /// Failed to prepare the statement for metadata fetch. + #[error("Failed to prepare the statement: {0}")] + PrepareError(#[from] RequestAttemptError), - /// system.local has invalid column type. - #[error("system.local has invalid column type: {0}")] - SystemLocalInvalidColumnType(TypeCheckError), + /// Failed to serialize statement parameters. + #[error("Failed to serialize statement parameters: {0}")] + SerializationError(#[from] SerializationError), + /// Failed to obtain next row from response to the metadata fetch query. + #[error("Failed to obtain next row from response to the query: {0}")] + NextRowError(#[from] NextRowError), +} + +/// An error that occurred during peers metadata fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum PeersMetadataError { /// Empty peers list returned during peers metadata fetch. #[error("Peers list is empty")] EmptyPeers, @@ -471,10 +496,6 @@ pub enum PeersMetadataError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum KeyspacesMetadataError { - /// system_schema.keyspaces has invalid column type. - #[error("system_schema.keyspaces has invalid column type: {0}")] - SchemaKeyspacesInvalidColumnType(TypeCheckError), - /// Bad keyspace replication strategy. #[error("Bad keyspace <{keyspace}> replication strategy: {error}")] Strategy { @@ -509,9 +530,16 @@ pub enum KeyspaceStrategyError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum UdtMetadataError { - /// system_schema.types has invalid column type. - #[error("system_schema.types has invalid column type: {0}")] - SchemaTypesInvalidColumnType(TypeCheckError), + /// Failed to parse CQL type returned from system_schema.types query. + #[error( + "Failed to parse a CQL type returned from system_schema.types query. \ + Type '{typ}', at position {position}: {reason}" + )] + InvalidCqlType { + typ: String, + position: usize, + reason: String, + }, /// Circular UDT dependency detected. #[error("Detected circular dependency between user defined types - toposort is impossible!")] @@ -522,13 +550,16 @@ pub enum UdtMetadataError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum TablesMetadataError { - /// system_schema.tables has invalid column type. - #[error("system_schema.tables has invalid column type: {0}")] - SchemaTablesInvalidColumnType(TypeCheckError), - - /// system_schema.columns has invalid column type. - #[error("system_schema.columns has invalid column type: {0}")] - SchemaColumnsInvalidColumnType(TypeCheckError), + /// Failed to parse CQL type returned from system_schema.columns query. + #[error( + "Failed to parse a CQL type returned from system_schema.columns query. \ + Type '{typ}', at position {position}: {reason}" + )] + InvalidCqlType { + typ: String, + position: usize, + reason: String, + }, /// Unknown column kind. #[error("Unknown column kind '{column_kind}' for {keyspace_name}.{table_name}.{column_name}")] @@ -540,15 +571,6 @@ pub enum TablesMetadataError { }, } -/// An error that occurred during views metadata fetch. -#[derive(Error, Debug, Clone)] -#[non_exhaustive] -pub enum ViewsMetadataError { - /// system_schema.views has invalid column type. - #[error("system_schema.views has invalid column type: {0}")] - SchemaViewsInvalidColumnType(TypeCheckError), -} - /// Error caused by caller creating an invalid query #[derive(Error, Debug, Clone)] #[error("Invalid query passed to Session")]