diff --git a/.github/workflows/authenticate_test.yml b/.github/workflows/authenticate_test.yml index b6a8251661..7c9643f66b 100644 --- a/.github/workflows/authenticate_test.yml +++ b/.github/workflows/authenticate_test.yml @@ -21,7 +21,8 @@ jobs: runs-on: ubuntu-latest services: scylladb: - image: scylladb/scylla-passauth + image: scylladb/scylla + command: --authenticator PasswordAuthenticator ports: - 9042:9042 options: --health-cmd "cqlsh --username cassandra --password cassandra --debug" --health-interval 5s --health-retries 30 diff --git a/.github/workflows/tls.yml b/.github/workflows/tls.yml index 65e0721b59..5691900bd1 100644 --- a/.github/workflows/tls.yml +++ b/.github/workflows/tls.yml @@ -20,7 +20,8 @@ jobs: timeout-minutes: 60 services: scylladb: - image: scylladb/scylla-tls + image: scylla-tls + build: ./test/tls ports: - 9042:9042 - 9142:9142 diff --git a/scylla-cql/src/types/serialize/batch.rs b/scylla-cql/src/types/serialize/batch.rs index aff43b990e..9fbfa73e66 100644 --- a/scylla-cql/src/types/serialize/batch.rs +++ b/scylla-cql/src/types/serialize/batch.rs @@ -384,7 +384,7 @@ where fn is_empty_next(&mut self) -> Option { self.0 .next_serialized() - .map(|sv| sv.map_or(false, |sv| sv.len() == 0)) + .map(|sv| sv.is_ok_and(|sv| sv.len() == 0)) } #[inline] diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 309fe58157..d5411577b6 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -150,11 +150,13 @@ struct UseKeyspaceRequest { } impl Cluster { + #[allow(clippy::too_many_arguments)] pub(crate) async fn new( known_nodes: Vec, pool_config: PoolConfig, keyspaces_to_fetch: Vec, fetch_schema_metadata: bool, + metadata_request_serverside_timeout: Option, host_filter: Option>, cluster_metadata_refresh_interval: Duration, tablet_receiver: tokio::sync::mpsc::Receiver<(TableSpec<'static>, RawTablet)>, @@ -170,6 +172,7 @@ impl Cluster { control_connection_repair_sender, pool_config.connection_config.clone(), pool_config.keepalive_interval, + metadata_request_serverside_timeout, server_events_sender, keyspaces_to_fetch, fetch_schema_metadata, diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 3a550ae6ae..be7a9dd789 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -2377,7 +2377,7 @@ impl VerifiedKeyspaceName { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use assert_matches::assert_matches; use scylla_cql::frame::protocol_features::{ LWT_OPTIMIZATION_META_BIT_MASK_KEY, SCYLLA_LWT_ADD_METADATA_MARK_EXTENSION, @@ -2406,7 +2406,7 @@ mod tests { use std::time::Duration; // Just like resolve_hostname in session.rs - async fn resolve_hostname(hostname: &str) -> SocketAddr { + pub(crate) async fn resolve_hostname(hostname: &str) -> SocketAddr { match tokio::net::lookup_host(hostname).await { Ok(mut addrs) => addrs.next().unwrap(), Err(_) => { diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 90bdfead42..90a3a6b5e6 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -277,6 +277,9 @@ pub struct SessionConfig { /// If true, full schema is fetched with every metadata refresh. pub fetch_schema_metadata: bool, + /// Custom timeout for requests that query metadata. + pub metadata_request_serverside_timeout: Option, + /// Interval of sending keepalive requests. /// If `None`, keepalives are never sent, so `Self::keepalive_timeout` has no effect. pub keepalive_interval: Option, @@ -385,6 +388,7 @@ impl SessionConfig { disallow_shard_aware_port: false, keyspaces_to_fetch: Vec::new(), fetch_schema_metadata: true, + metadata_request_serverside_timeout: Some(Duration::from_secs(2)), keepalive_interval: Some(Duration::from_secs(30)), keepalive_timeout: Some(Duration::from_secs(30)), schema_agreement_timeout: Duration::from_secs(60), @@ -1103,6 +1107,7 @@ where pool_config, config.keyspaces_to_fetch, config.fetch_schema_metadata, + config.metadata_request_serverside_timeout, config.host_filter, config.cluster_metadata_refresh_interval, tablet_receiver, diff --git a/scylla/src/transport/session_builder.rs b/scylla/src/transport/session_builder.rs index 404e277335..e888a5280c 100644 --- a/scylla/src/transport/session_builder.rs +++ b/scylla/src/transport/session_builder.rs @@ -594,7 +594,7 @@ impl GenericSessionBuilder { /// # Ok(()) /// # } /// ``` - pub fn connection_timeout(mut self, duration: std::time::Duration) -> Self { + pub fn connection_timeout(mut self, duration: Duration) -> Self { self.config.connect_timeout = duration; self } @@ -706,6 +706,30 @@ impl GenericSessionBuilder { self } + /// Set the server-side timeout for metadata queries. + /// The default is `Some(Duration::from_secs(2))`. It means that + /// the all metadata queries will be set the 2 seconds timeout + /// no matter what timeout is set as a cluster default. + /// This prevents timeouts of schema queries when the schema is large + /// and the default timeout is configured as tight. + /// + /// # Example + /// ``` + /// # use scylla::{Session, SessionBuilder}; + /// # async fn example() -> Result<(), Box> { + /// let session: Session = SessionBuilder::new() + /// .known_node("127.0.0.1:9042") + /// .metadata_request_serverside_timeout(std::time::Duration::from_secs(5)) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn metadata_request_serverside_timeout(mut self, timeout: Duration) -> Self { + self.config.metadata_request_serverside_timeout = Some(timeout); + self + } + /// Set the keepalive interval. /// The default is `Some(Duration::from_secs(30))`, which corresponds /// to keepalive CQL messages being sent every 30 seconds. diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 2b83b7ade2..471c16e847 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -41,6 +41,7 @@ use super::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint}; pub(crate) struct MetadataReader { connection_config: ConnectionConfig, keepalive_interval: Option, + request_serverside_timeout: Option, control_connection_endpoint: UntranslatedEndpoint, control_connection: NodeConnectionPool, @@ -457,6 +458,7 @@ impl MetadataReader { control_connection_repair_requester: broadcast::Sender<()>, mut connection_config: ConnectionConfig, keepalive_interval: Option, + request_serverside_timeout: Option, server_event_sender: mpsc::Sender, keyspaces_to_fetch: Vec, fetch_schema: bool, @@ -493,6 +495,7 @@ impl MetadataReader { Ok(MetadataReader { control_connection_endpoint, control_connection, + request_serverside_timeout, keepalive_interval, connection_config, known_peers: initial_peers @@ -629,15 +632,16 @@ impl MetadataReader { async fn fetch_metadata(&self, initial: bool) -> Result { // TODO: Timeouts? self.control_connection.wait_until_initialized().await; - let conn = &self.control_connection.random_connection()?; - - let res = query_metadata( - conn, - self.control_connection_endpoint.address().port(), - &self.keyspaces_to_fetch, - self.fetch_schema, - ) - .await; + let conn = ControlConnection::new(self.control_connection.random_connection()?) + .override_timeout(self.request_serverside_timeout); + + let res = conn + .query_metadata( + self.control_connection_endpoint.address().port(), + &self.keyspaces_to_fetch, + self.fetch_schema, + ) + .await; if initial { if let Err(err) = res { @@ -743,28 +747,30 @@ impl MetadataReader { } } -async fn query_metadata( - conn: &Arc, - connect_port: u16, - keyspace_to_fetch: &[String], - fetch_schema: bool, -) -> Result { - let peers_query = query_peers(conn, connect_port); - let keyspaces_query = query_keyspaces(conn, keyspace_to_fetch, fetch_schema); +impl ControlConnection { + async fn query_metadata( + self, + connect_port: u16, + keyspace_to_fetch: &[String], + fetch_schema: bool, + ) -> Result { + let peers_query = self.clone().query_peers(connect_port); + let keyspaces_query = self.query_keyspaces(keyspace_to_fetch, fetch_schema); - let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?; + let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?; - // There must be at least one peer - if peers.is_empty() { - return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers).into()); - } + // There must be at least one peer + if peers.is_empty() { + return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers).into()); + } - // At least one peer has to have some tokens - if peers.iter().all(|peer| peer.tokens.is_empty()) { - return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); - } + // At least one peer has to have some tokens + if peers.iter().all(|peer| peer.tokens.is_empty()) { + return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into()); + } - Ok(Metadata { peers, keyspaces }) + Ok(Metadata { peers, keyspaces }) + } } #[derive(DeserializeRow)] @@ -794,235 +800,488 @@ impl NodeInfoSource { } } -const METADATA_QUERY_PAGE_SIZE: i32 = 1024; +mod control_connection { + use fmt::Write as _; + use scylla_cql::types::serialize::row::SerializedValues; -async fn query_peers(conn: &Arc, connect_port: u16) -> Result, QueryError> { - let mut peers_query = - Query::new("select host_id, rpc_address, data_center, rack, tokens from system.peers"); - peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE); - let peers_query_stream = conn - .clone() - .query_iter(peers_query) - .map(|pager_res| { - let pager = pager_res?; - let rows_stream = pager.rows_stream::().map_err(|err| { - MetadataError::Peers(PeersMetadataError::SystemPeersInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(rows_stream) - }) - .into_stream() - .try_flatten() - .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); - - let mut local_query = - Query::new("select host_id, rpc_address, data_center, rack, tokens from system.local"); - local_query.set_page_size(METADATA_QUERY_PAGE_SIZE); - let local_query_stream = conn - .clone() - .query_iter(local_query) - .map(|pager_res| { - let pager = pager_res?; - let rows_stream = pager.rows_stream::().map_err(|err| { - MetadataError::Peers(PeersMetadataError::SystemLocalInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(rows_stream) - }) - .into_stream() - .try_flatten() - .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result))); + use crate::prepared_statement::PreparedStatement; + use crate::transport::errors::UserRequestError; - let untranslated_rows = stream::select(peers_query_stream, local_query_stream); + use super::*; - let local_ip: IpAddr = conn.get_connect_address().ip(); - let local_address = SocketAddr::new(local_ip, connect_port); + #[derive(Clone)] + pub(super) struct ControlConnection { + conn: Arc, + overriden_timeout: Option, + } - let translated_peers_futures = untranslated_rows.map(|row_result| async { - match row_result { - Ok((source, row)) => create_peer_from_row(source, row, local_address).await, - Err(err) => { - warn!( - "system.peers or system.local has an invalid row, skipping it: {}", - err - ); - Ok(None) + impl ControlConnection { + pub(super) fn new(conn: Arc) -> Self { + Self { + conn, + overriden_timeout: None, } } - }); - let peers = translated_peers_futures - .buffer_unordered(256) - .try_collect::>() - .await?; - Ok(peers.into_iter().flatten().collect()) -} + pub(super) fn override_timeout(self, overriden_timeout: Option) -> Self { + Self { + overriden_timeout, + ..self + } + } -async fn create_peer_from_row( - source: NodeInfoSource, - row: NodeInfoRow, - local_address: SocketAddr, -) -> Result, QueryError> { - let NodeInfoRow { - host_id, - untranslated_ip_addr, - datacenter, - rack, - tokens, - } = row; - - let host_id = match host_id { - Some(host_id) => host_id, - None => { - warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); - return Ok(None); + pub(super) fn get_connect_address(&self) -> SocketAddr { + self.conn.get_connect_address() } - }; - let connect_port = local_address.port(); - let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port); + /// Returns true iff the target node is a ScyllaDB node (and not a, e.g., Cassandra node). + pub(super) fn is_to_scylladb(&self) -> bool { + self.conn.get_shard_info().is_some() + } - let node_addr = match source { - NodeInfoSource::Local => { - // For the local node we should use connection's address instead of rpc_address. - // (The reason is that rpc_address in system.local can be wrong.) - // Thus, we replace address in local_rows with connection's address. - // We need to replace rpc_address with control connection address. - NodeAddr::Untranslatable(local_address) + fn maybe_append_timeout_override(&self, query: &mut Query) { + if let Some(timeout) = self.overriden_timeout { + if self.is_to_scylladb() { + // SAFETY: io::fmt::Write impl for String is infallible. + write!(query.contents, " USING TIMEOUT {}ms", timeout.as_millis()).unwrap() + } + } } - NodeInfoSource::Peer => { - // The usual case - no translation. - NodeAddr::Translatable(untranslated_address) + + /// Executes a query and fetches its results over multiple pages, using + /// the asynchronous iterator interface. + pub(super) async fn query_iter(self, mut query: Query) -> Result { + self.maybe_append_timeout_override(&mut query); + self.conn.query_iter(query).await } - }; - let tokens_str: Vec = tokens.unwrap_or_default(); + pub(crate) async fn prepare( + &self, + mut query: Query, + ) -> Result { + self.maybe_append_timeout_override(&mut query); + self.conn.prepare(&query).await + } - // Parse string representation of tokens as integer values - let tokens: Vec = match tokens_str - .iter() - .map(|s| Token::from_str(s)) - .collect::, _>>() - { - Ok(parsed) => parsed, - Err(e) => { - // FIXME: we could allow the users to provide custom partitioning information - // in order for it to work with non-standard token sizes. - // Also, we could implement support for Cassandra's other standard partitioners - // like RandomPartitioner or ByteOrderedPartitioner. - trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); - vec![Token::new(rand::thread_rng().gen::())] + /// Executes a prepared statements and fetches its results over multiple pages, using + /// the asynchronous iterator interface. + pub(crate) async fn execute_iter( + self, + prepared_statement: PreparedStatement, + values: SerializedValues, + ) -> Result { + self.conn.execute_iter(prepared_statement, values).await } - }; + } - Ok(Some(Peer { - host_id, - address: node_addr, - tokens, - datacenter, - rack, - })) -} + #[cfg(test)] + mod tests { + use std::net::SocketAddr; + use std::sync::Arc; + use std::time::Duration; -fn query_filter_keyspace_name<'a, R>( - conn: &Arc, - query_str: &'a str, - keyspaces_to_fetch: &'a [String], - convert_typecheck_error: impl FnOnce(TypeCheckError) -> MetadataError + 'a, -) -> impl Stream> + 'a -where - R: DeserializeOwnedRow + 'static, -{ - let conn = conn.clone(); - - // This function is extracted to reduce monomorphisation penalty: - // query_filter_keyspace_name() is going to be monomorphised into 5 distinct functions, - // so it's better to extract the common part. - async fn make_keyspace_filtered_query_pager( - conn: Arc, - query_str: &str, - keyspaces_to_fetch: &[String], - ) -> Result { - if keyspaces_to_fetch.is_empty() { - let mut query = Query::new(query_str); - query.set_page_size(METADATA_QUERY_PAGE_SIZE); + use scylla_proxy::{ + Condition, Node, Proxy, Reaction as _, RequestFrame, RequestOpcode, RequestReaction, + RequestRule, ShardAwareness, + }; + use tokio::sync::mpsc; + + use crate::test_utils::setup_tracing; + use crate::transport::connection::open_connection; + use crate::transport::connection::tests::resolve_hostname; + use crate::transport::node::ResolvedContactPoint; + use crate::transport::topology::{ConnectionConfig, UntranslatedEndpoint}; + + use super::ControlConnection; + + /// Tests that ControlConnection enforces the provided custom timeout + /// iff ScyllaDB is the target node (else ignores the custom timeout). + #[cfg(not(scylla_cloud_tests))] + #[tokio::test] + #[ntest::timeout(2000)] + async fn test_custom_timeouts() { + setup_tracing(); + + let proxy_addr = SocketAddr::new(scylla_proxy::get_exclusive_local_address(), 9042); + let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); + let node_addr: SocketAddr = resolve_hostname(&uri).await; + + let (feedback_tx, mut feedback_rx) = mpsc::unbounded_channel(); + + let metadata_query_feedback_rule = RequestRule( + Condition::or( + Condition::RequestOpcode(RequestOpcode::Query), + Condition::RequestOpcode(RequestOpcode::Prepare), + ), + RequestReaction::noop().with_feedback_when_performed(feedback_tx), + ); - conn.query_iter(query).await - } else { - let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; - let query_str = format!("{query_str} where keyspace_name in ?"); + let proxy = Proxy::builder() + .with_node( + Node::builder() + .proxy_address(proxy_addr) + .real_address(node_addr) + .shard_awareness(ShardAwareness::QueryNode) + .request_rules(vec![metadata_query_feedback_rule]) + .build(), + ) + .build() + .run() + .await + .unwrap(); + + let (conn, _error_receiver) = open_connection( + UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + address: proxy_addr, + datacenter: None, + }), + None, + &ConnectionConfig::default(), + ) + .await + .unwrap(); + + const QUERY_STR: &str = "SELECT host_id FROM system.local"; + + fn expected_query_body(dur: Duration) -> String { + format!("{} USING TIMEOUT {}ms", QUERY_STR, dur.as_millis()) + } + + fn contains_subslice(slice: &[u8], subslice: &[u8]) -> bool { + slice + .windows(subslice.len()) + .any(|window| window == subslice) + } - let mut query = Query::new(query_str); - query.set_page_size(METADATA_QUERY_PAGE_SIZE); + async fn assert_no_custom_timeout( + feedback_rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option)>, + ) { + let (frame, _) = feedback_rx.recv().await.unwrap(); + let clause = "USING TIMEOUT"; + assert!( + !contains_subslice(&frame.body, clause.as_bytes()), + "slice {:?} does contain subslice {:?}", + &frame.body, + clause, + ); + } - let prepared = conn.prepare(&query).await?; - let serialized_values = prepared.serialize_values(&keyspaces)?; - conn.execute_iter(prepared, serialized_values).await + async fn assert_custom_timeout( + feedback_rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option)>, + dur: Duration, + ) { + let (frame, _) = feedback_rx.recv().await.unwrap(); + let expected = expected_query_body(dur); + assert!( + contains_subslice(&frame.body, expected.as_bytes()), + "slice {:?} does not contain subslice {:?}", + &frame.body, + expected, + ); + } + + async fn assert_custom_timeout_iff_scylladb( + feedback_rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option)>, + dur: Duration, + connected_to_scylladb: bool, + ) { + if connected_to_scylladb { + assert_custom_timeout(feedback_rx, dur).await; + } else { + assert_no_custom_timeout(feedback_rx).await; + } + } + + let connected_to_scylladb = conn.get_shard_info().is_some(); + let conn_with_default_timeout = ControlConnection::new(Arc::new(conn)); + + // No custom timeout set. + { + conn_with_default_timeout + .clone() + .query_iter(QUERY_STR.into()) + .await + .unwrap(); + + assert_no_custom_timeout(&mut feedback_rx).await; + + conn_with_default_timeout + .prepare(QUERY_STR.into()) + .await + .unwrap(); + + assert_no_custom_timeout(&mut feedback_rx).await; + } + + // Custom timeout set, so it should be set in query strings iff the target node is ScyllaDB. + { + let custom_timeout = Duration::from_millis(2137); + let conn_with_custom_timeout = + conn_with_default_timeout.override_timeout(Some(custom_timeout)); + + conn_with_custom_timeout + .clone() + .query_iter(QUERY_STR.into()) + .await + .unwrap(); + + assert_custom_timeout_iff_scylladb( + &mut feedback_rx, + custom_timeout, + connected_to_scylladb, + ) + .await; + + conn_with_custom_timeout + .prepare(QUERY_STR.into()) + .await + .unwrap(); + + assert_custom_timeout_iff_scylladb( + &mut feedback_rx, + custom_timeout, + connected_to_scylladb, + ) + .await; + } + + let _ = proxy.finish().await; } } - - let fut = async move { - let pager = make_keyspace_filtered_query_pager(conn, query_str, keyspaces_to_fetch).await?; - let stream: super::iterator::TypedRowStream = - pager.rows_stream::().map_err(convert_typecheck_error)?; - Ok::<_, QueryError>(stream) - }; - fut.into_stream().try_flatten() } +use control_connection::ControlConnection; -async fn query_keyspaces( - conn: &Arc, - keyspaces_to_fetch: &[String], - fetch_schema: bool, -) -> Result, QueryError> { - let rows = query_filter_keyspace_name::<(String, HashMap)>( - conn, - "select keyspace_name, replication from system_schema.keyspaces", - keyspaces_to_fetch, - |err| { - MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType( - err, - )) - }, - ); +const METADATA_QUERY_PAGE_SIZE: i32 = 1024; +impl ControlConnection { + async fn query_peers(self, connect_port: u16) -> Result, QueryError> { + let mut peers_query = + Query::new("select host_id, rpc_address, data_center, rack, tokens from system.peers"); + peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE); + let peers_query_stream = self + .clone() + .query_iter(peers_query) + .map(|pager_res| { + let pager = pager_res?; + let rows_stream = pager.rows_stream::().map_err(|err| { + MetadataError::Peers(PeersMetadataError::SystemPeersInvalidColumnType(err)) + })?; + Ok::<_, QueryError>(rows_stream) + }) + .into_stream() + .try_flatten() + .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result))); + + let local_ip: IpAddr = self.get_connect_address().ip(); + let local_address = SocketAddr::new(local_ip, connect_port); + + let mut local_query = + Query::new("select host_id, rpc_address, data_center, rack, tokens from system.local"); + local_query.set_page_size(METADATA_QUERY_PAGE_SIZE); + let local_query_stream = self + .query_iter(local_query) + .map(|pager_res| { + let pager = pager_res?; + let rows_stream = pager.rows_stream::().map_err(|err| { + MetadataError::Peers(PeersMetadataError::SystemLocalInvalidColumnType(err)) + })?; + Ok::<_, QueryError>(rows_stream) + }) + .into_stream() + .try_flatten() + .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result))); + + let untranslated_rows = stream::select(peers_query_stream, local_query_stream); + + let translated_peers_futures = untranslated_rows.map(|row_result| async { + match row_result { + Ok((source, row)) => Self::create_peer_from_row(source, row, local_address).await, + Err(err) => { + warn!( + "system.peers or system.local has an invalid row, skipping it: {}", + err + ); + Ok(None) + } + } + }); - let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema { - let udts = query_user_defined_types(conn, keyspaces_to_fetch).await?; - ( - query_tables(conn, keyspaces_to_fetch, &udts).await?, - query_views(conn, keyspaces_to_fetch, &udts).await?, - udts, - ) - } else { - (HashMap::new(), HashMap::new(), HashMap::new()) - }; + let peers = translated_peers_futures + .buffer_unordered(256) + .try_collect::>() + .await?; + Ok(peers.into_iter().flatten().collect()) + } - rows.map(|row_result| { - let (keyspace_name, strategy_map) = row_result?; + async fn create_peer_from_row( + source: NodeInfoSource, + row: NodeInfoRow, + local_address: SocketAddr, + ) -> Result, QueryError> { + let NodeInfoRow { + host_id, + untranslated_ip_addr, + datacenter, + rack, + tokens, + } = row; + + let host_id = match host_id { + Some(host_id) => host_id, + None => { + warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack); + return Ok(None); + } + }; - let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| { - MetadataError::Keyspaces(KeyspacesMetadataError::Strategy { - keyspace: keyspace_name.clone(), - error, - }) - })?; - let tables = all_tables.remove(&keyspace_name).unwrap_or_default(); - let views = all_views.remove(&keyspace_name).unwrap_or_default(); - let user_defined_types = all_user_defined_types - .remove(&keyspace_name) - .unwrap_or_default(); - - let keyspace = Keyspace { - strategy, - tables, - views, - user_defined_types, + let connect_port = local_address.port(); + let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port); + + let node_addr = match source { + NodeInfoSource::Local => { + // For the local node we should use connection's address instead of rpc_address. + // (The reason is that rpc_address in system.local can be wrong.) + // Thus, we replace address in local_rows with connection's address. + // We need to replace rpc_address with control connection address. + NodeAddr::Untranslatable(local_address) + } + NodeInfoSource::Peer => { + // The usual case - no translation. + NodeAddr::Translatable(untranslated_address) + } }; - Ok((keyspace_name, keyspace)) - }) - .try_collect() - .await -} + let tokens_str: Vec = tokens.unwrap_or_default(); + + // Parse string representation of tokens as integer values + let tokens: Vec = match tokens_str + .iter() + .map(|s| Token::from_str(s)) + .collect::, _>>() + { + Ok(parsed) => parsed, + Err(e) => { + // FIXME: we could allow the users to provide custom partitioning information + // in order for it to work with non-standard token sizes. + // Also, we could implement support for Cassandra's other standard partitioners + // like RandomPartitioner or ByteOrderedPartitioner. + trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e); + vec![Token::new(rand::thread_rng().gen::())] + } + }; + + Ok(Some(Peer { + host_id, + address: node_addr, + tokens, + datacenter, + rack, + })) + } + + fn query_filter_keyspace_name<'a, R>( + self, + query_str: &'a str, + keyspaces_to_fetch: &'a [String], + convert_typecheck_error: impl FnOnce(TypeCheckError) -> MetadataError + 'a, + ) -> impl Stream> + 'a + where + R: DeserializeOwnedRow + 'static, + { + // This function is extracted to reduce monomorphisation penalty: + // query_filter_keyspace_name() is going to be monomorphised into 5 distinct functions, + // so it's better to extract the common part. + async fn make_keyspace_filtered_query_pager( + conn: ControlConnection, + query_str: &str, + keyspaces_to_fetch: &[String], + ) -> Result { + if keyspaces_to_fetch.is_empty() { + let mut query = Query::new(query_str); + query.set_page_size(METADATA_QUERY_PAGE_SIZE); + + conn.query_iter(query).await + } else { + let keyspaces = &[keyspaces_to_fetch] as &[&[String]]; + let query_str = format!("{query_str} where keyspace_name in ?"); + + let mut query = Query::new(query_str); + query.set_page_size(METADATA_QUERY_PAGE_SIZE); + + let prepared = conn.prepare(query).await?; + let serialized_values = prepared.serialize_values(&keyspaces)?; + conn.execute_iter(prepared, serialized_values).await + } + } + let fut = async move { + let pager = + make_keyspace_filtered_query_pager(self, query_str, keyspaces_to_fetch).await?; + let stream: super::iterator::TypedRowStream = + pager.rows_stream::().map_err(convert_typecheck_error)?; + Ok::<_, QueryError>(stream) + }; + fut.into_stream().try_flatten() + } + + async fn query_keyspaces( + self, + keyspaces_to_fetch: &[String], + fetch_schema: bool, + ) -> Result, QueryError> { + let rows = self + .clone() + .query_filter_keyspace_name::<(String, HashMap)>( + "select keyspace_name, replication from system_schema.keyspaces", + keyspaces_to_fetch, + |err| { + MetadataError::Keyspaces( + KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType(err), + ) + }, + ); + + let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema { + let udts = self + .clone() + .query_user_defined_types(keyspaces_to_fetch) + .await?; + ( + self.clone().query_tables(keyspaces_to_fetch, &udts).await?, + self.query_views(keyspaces_to_fetch, &udts).await?, + udts, + ) + } else { + (HashMap::new(), HashMap::new(), HashMap::new()) + }; + + rows.map(|row_result| { + let (keyspace_name, strategy_map) = row_result?; + + let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| { + MetadataError::Keyspaces(KeyspacesMetadataError::Strategy { + keyspace: keyspace_name.clone(), + error, + }) + })?; + let tables = all_tables.remove(&keyspace_name).unwrap_or_default(); + let views = all_views.remove(&keyspace_name).unwrap_or_default(); + let user_defined_types = all_user_defined_types + .remove(&keyspace_name) + .unwrap_or_default(); + + let keyspace = Keyspace { + strategy, + tables, + views, + user_defined_types, + }; + + Ok((keyspace_name, keyspace)) + }) + .try_collect() + .await + } +} #[derive(DeserializeRow, Debug)] #[scylla(crate = "crate")] struct UdtRow { @@ -1062,65 +1321,65 @@ impl TryFrom for UdtRowWithParsedFieldTypes { } } -async fn query_user_defined_types( - conn: &Arc, - keyspaces_to_fetch: &[String], -) -> Result>>, QueryError> { - let rows = query_filter_keyspace_name::( - conn, - "select keyspace_name, type_name, field_names, field_types from system_schema.types", - keyspaces_to_fetch, - |err| MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)), - ); +impl ControlConnection { + async fn query_user_defined_types( + self, + keyspaces_to_fetch: &[String], + ) -> Result>>, QueryError> { + let rows = self.query_filter_keyspace_name::( + "select keyspace_name, type_name, field_names, field_types from system_schema.types", + keyspaces_to_fetch, + |err| MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)), + ); - let mut udt_rows: Vec = rows - .map(|row_result| { - let udt_row = row_result?.try_into()?; + let mut udt_rows: Vec = rows + .map(|row_result| { + let udt_row = row_result?.try_into()?; - Ok::<_, QueryError>(udt_row) - }) - .try_collect() - .await?; + Ok::<_, QueryError>(udt_row) + }) + .try_collect() + .await?; - let instant_before_toposort = Instant::now(); - topo_sort_udts(&mut udt_rows)?; - let toposort_elapsed = instant_before_toposort.elapsed(); - debug!( - "Toposort of UDT definitions took {:.2} ms (udts len: {})", - toposort_elapsed.as_secs_f64() * 1000., - udt_rows.len(), - ); + let instant_before_toposort = Instant::now(); + topo_sort_udts(&mut udt_rows)?; + let toposort_elapsed = instant_before_toposort.elapsed(); + debug!( + "Toposort of UDT definitions took {:.2} ms (udts len: {})", + toposort_elapsed.as_secs_f64() * 1000., + udt_rows.len(), + ); - let mut udts = HashMap::new(); - for udt_row in udt_rows { - let UdtRowWithParsedFieldTypes { - keyspace_name, - type_name, - field_names, - field_types, - } = udt_row; + let mut udts = HashMap::new(); + for udt_row in udt_rows { + let UdtRowWithParsedFieldTypes { + keyspace_name, + type_name, + field_names, + field_types, + } = udt_row; - let mut fields = Vec::with_capacity(field_names.len()); + let mut fields = Vec::with_capacity(field_names.len()); - for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) { - let cql_type = field_type.into_cql_type(&keyspace_name, &udts); - fields.push((field_name, cql_type)); - } + for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) { + let cql_type = field_type.into_cql_type(&keyspace_name, &udts); + fields.push((field_name, cql_type)); + } - let udt = Arc::new(UserDefinedType { - name: type_name.clone(), - keyspace: keyspace_name.clone(), - field_types: fields, - }); + let udt = Arc::new(UserDefinedType { + name: type_name.clone(), + keyspace: keyspace_name.clone(), + field_types: fields, + }); - udts.entry(keyspace_name) - .or_insert_with(HashMap::new) - .insert(type_name, udt); - } + udts.entry(keyspace_name) + .or_insert_with(HashMap::new) + .insert(type_name, udt); + } - Ok(udts) + Ok(udts) + } } - fn topo_sort_udts(udts: &mut Vec) -> Result<(), QueryError> { fn do_with_referenced_udts(what: &mut impl FnMut(&str), pre_cql_type: &PreCqlType) { match pre_cql_type { @@ -1380,188 +1639,192 @@ mod toposort_tests { } } -async fn query_tables( - conn: &Arc, - keyspaces_to_fetch: &[String], - udts: &HashMap>>, -) -> Result>, QueryError> { - let rows = query_filter_keyspace_name::<(String, String)>( - conn, - "SELECT keyspace_name, table_name FROM system_schema.tables", - keyspaces_to_fetch, - |err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)), - ); - let mut result = HashMap::new(); - let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?; +impl ControlConnection { + async fn query_tables( + self, + keyspaces_to_fetch: &[String], + udts: &HashMap>>, + ) -> Result>, QueryError> { + let rows = self.clone().query_filter_keyspace_name::<(String, String)>( + "SELECT keyspace_name, table_name FROM system_schema.tables", + keyspaces_to_fetch, + |err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)), + ); + let mut result = HashMap::new(); + let mut tables = self.query_tables_schema(keyspaces_to_fetch, udts).await?; - rows.map(|row_result| { - let keyspace_and_table_name = row_result?; + rows.map(|row_result| { + let keyspace_and_table_name = row_result?; - let table = tables.remove(&keyspace_and_table_name).unwrap_or(Table { - columns: HashMap::new(), - partition_key: vec![], - clustering_key: vec![], - partitioner: None, - }); + let table = tables.remove(&keyspace_and_table_name).unwrap_or(Table { + columns: HashMap::new(), + partition_key: vec![], + clustering_key: vec![], + partitioner: None, + }); - result - .entry(keyspace_and_table_name.0) - .or_insert_with(HashMap::new) - .insert(keyspace_and_table_name.1, table); + result + .entry(keyspace_and_table_name.0) + .or_insert_with(HashMap::new) + .insert(keyspace_and_table_name.1, table); - Ok::<_, QueryError>(()) - }) - .try_for_each(|_| future::ok(())) - .await?; + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; - Ok(result) -} + Ok(result) + } -async fn query_views( - conn: &Arc, - keyspaces_to_fetch: &[String], - udts: &HashMap>>, -) -> Result>, QueryError> { - let rows = query_filter_keyspace_name::<(String, String, String)>( - conn, - "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", - keyspaces_to_fetch, - |err| MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)), - ); + async fn query_views( + self, + keyspaces_to_fetch: &[String], + udts: &HashMap>>, + ) -> Result>, QueryError> { + let rows = self + .clone() + .query_filter_keyspace_name::<(String, String, String)>( + "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", + keyspaces_to_fetch, + |err| MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)), + ); - let mut result = HashMap::new(); - let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?; + let mut result = HashMap::new(); + let mut tables = self.query_tables_schema(keyspaces_to_fetch, udts).await?; - rows.map(|row_result| { - let (keyspace_name, view_name, base_table_name) = row_result?; + rows.map(|row_result| { + let (keyspace_name, view_name, base_table_name) = row_result?; - let keyspace_and_view_name = (keyspace_name, view_name); + let keyspace_and_view_name = (keyspace_name, view_name); - let table = tables.remove(&keyspace_and_view_name).unwrap_or(Table { - columns: HashMap::new(), - partition_key: vec![], - clustering_key: vec![], - partitioner: None, - }); - let materialized_view = MaterializedView { - view_metadata: table, - base_table_name, - }; + let table = tables.remove(&keyspace_and_view_name).unwrap_or(Table { + columns: HashMap::new(), + partition_key: vec![], + clustering_key: vec![], + partitioner: None, + }); + let materialized_view = MaterializedView { + view_metadata: table, + base_table_name, + }; - result - .entry(keyspace_and_view_name.0) - .or_insert_with(HashMap::new) - .insert(keyspace_and_view_name.1, materialized_view); + result + .entry(keyspace_and_view_name.0) + .or_insert_with(HashMap::new) + .insert(keyspace_and_view_name.1, materialized_view); - Ok::<_, QueryError>(()) - }) - .try_for_each(|_| future::ok(())) - .await?; + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; - Ok(result) -} + Ok(result) + } -async fn query_tables_schema( - conn: &Arc, - keyspaces_to_fetch: &[String], - udts: &HashMap>>, -) -> Result, QueryError> { - // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of - // type EmptyType for dense tables. This resolves into this CQL type name. - // This column shouldn't be exposed to the user but is currently exposed in system tables. - const THRIFT_EMPTY_TYPE: &str = "empty"; + async fn query_tables_schema( + self, + keyspaces_to_fetch: &[String], + udts: &HashMap>>, + ) -> Result, QueryError> { + // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of + // type EmptyType for dense tables. This resolves into this CQL type name. + // This column shouldn't be exposed to the user but is currently exposed in system tables. + const THRIFT_EMPTY_TYPE: &str = "empty"; - type RowType = (String, String, String, String, i32, String); + type RowType = (String, String, String, String, i32, String); - let rows = query_filter_keyspace_name::(conn, + let rows = self.clone().query_filter_keyspace_name::( "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch, |err| { MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err)) } ); - let mut tables_schema = HashMap::new(); + let mut tables_schema = HashMap::new(); - rows.map(|row_result| { - let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; + rows.map(|row_result| { + let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; - if type_ == THRIFT_EMPTY_TYPE { - return Ok::<_, QueryError>(()); - } + if type_ == THRIFT_EMPTY_TYPE { + return Ok::<_, QueryError>(()); + } - let pre_cql_type = map_string_to_cql_type(&type_)?; - let cql_type = pre_cql_type.into_cql_type(&keyspace_name, udts); + let pre_cql_type = map_string_to_cql_type(&type_)?; + let cql_type = pre_cql_type.into_cql_type(&keyspace_name, udts); - let kind = ColumnKind::from_str(&kind).map_err(|_| { - MetadataError::Tables(TablesMetadataError::UnknownColumnKind { - keyspace_name: keyspace_name.clone(), - table_name: table_name.clone(), - column_name: column_name.clone(), - column_kind: kind, - }) - })?; + let kind = ColumnKind::from_str(&kind).map_err(|_| { + MetadataError::Tables(TablesMetadataError::UnknownColumnKind { + keyspace_name: keyspace_name.clone(), + table_name: table_name.clone(), + column_name: column_name.clone(), + column_kind: kind, + }) + })?; - let entry = tables_schema.entry((keyspace_name, table_name)).or_insert(( - HashMap::new(), // columns - HashMap::new(), // partition key - HashMap::new(), // clustering key - )); + let entry = tables_schema.entry((keyspace_name, table_name)).or_insert(( + HashMap::new(), // columns + HashMap::new(), // partition key + HashMap::new(), // clustering key + )); - if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { - let key_map = if kind == ColumnKind::PartitionKey { - entry.1.borrow_mut() - } else { - entry.2.borrow_mut() - }; - key_map.insert(position, column_name.clone()); - } + if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { + let key_map = if kind == ColumnKind::PartitionKey { + entry.1.borrow_mut() + } else { + entry.2.borrow_mut() + }; + key_map.insert(position, column_name.clone()); + } - entry.0.insert( - column_name, - Column { - type_: cql_type, - kind, - }, - ); + entry.0.insert( + column_name, + Column { + type_: cql_type, + kind, + }, + ); - Ok::<_, QueryError>(()) - }) - .try_for_each(|_| future::ok(())) - .await?; + Ok::<_, QueryError>(()) + }) + .try_for_each(|_| future::ok(())) + .await?; - let mut all_partitioners = query_table_partitioners(conn).await?; - let mut result = HashMap::new(); + let mut all_partitioners = self.query_table_partitioners().await?; + let mut result = HashMap::new(); + + for ( + (keyspace_name, table_name), + (columns, partition_key_columns, clustering_key_columns), + ) in tables_schema + { + let mut partition_key = vec!["".to_string(); partition_key_columns.len()]; + for (position, column_name) in partition_key_columns { + partition_key[position as usize] = column_name; + } - for ((keyspace_name, table_name), (columns, partition_key_columns, clustering_key_columns)) in - tables_schema - { - let mut partition_key = vec!["".to_string(); partition_key_columns.len()]; - for (position, column_name) in partition_key_columns { - partition_key[position as usize] = column_name; - } + let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()]; + for (position, column_name) in clustering_key_columns { + clustering_key[position as usize] = column_name; + } - let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()]; - for (position, column_name) in clustering_key_columns { - clustering_key[position as usize] = column_name; - } + let keyspace_and_table_name = (keyspace_name, table_name); - let keyspace_and_table_name = (keyspace_name, table_name); + let partitioner = all_partitioners + .remove(&keyspace_and_table_name) + .unwrap_or_default(); - let partitioner = all_partitioners - .remove(&keyspace_and_table_name) - .unwrap_or_default(); + result.insert( + keyspace_and_table_name, + Table { + columns, + partition_key, + clustering_key, + partitioner, + }, + ); + } - result.insert( - keyspace_and_table_name, - Table { - columns, - partition_key, - clustering_key, - partitioner, - }, - ); + Ok(result) } - - Ok(result) } fn map_string_to_cql_type(type_: &str) -> Result { @@ -1681,44 +1944,47 @@ fn freeze_type(type_: PreCqlType) -> PreCqlType { } } -async fn query_table_partitioners( - conn: &Arc, -) -> Result>, QueryError> { - let mut partitioner_query = Query::new( - "select keyspace_name, table_name, partitioner from system_schema.scylla_tables", - ); - partitioner_query.set_page_size(METADATA_QUERY_PAGE_SIZE); - - let rows = conn - .clone() - .query_iter(partitioner_query) - .map(|pager_res| { - let pager = pager_res?; - let stream = pager - .rows_stream::<(String, String, Option)>() - .map_err(|err| { - MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)) - })?; - Ok::<_, QueryError>(stream) - }) - .into_stream() - .try_flatten(); +impl ControlConnection { + async fn query_table_partitioners( + self, + ) -> Result>, QueryError> { + let mut partitioner_query = Query::new( + "select keyspace_name, table_name, partitioner from system_schema.scylla_tables", + ); + partitioner_query.set_page_size(METADATA_QUERY_PAGE_SIZE); + + let rows = self + .query_iter(partitioner_query) + .map(|pager_res| { + let pager = pager_res?; + let stream = pager + .rows_stream::<(String, String, Option)>() + .map_err(|err| { + MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType( + err, + )) + })?; + Ok::<_, QueryError>(stream) + }) + .into_stream() + .try_flatten(); - let result = rows - .map(|row_result| { - let (keyspace_name, table_name, partitioner) = row_result?; - Ok::<_, QueryError>(((keyspace_name, table_name), partitioner)) - }) - .try_collect::>() - .await; - - match result { - // FIXME: This match catches all database errors with this error code despite the fact - // that we are only interested in the ones resulting from non-existent table - // system_schema.scylla_tables. - // For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262 - Err(QueryError::DbError(DbError::Invalid, _)) => Ok(HashMap::new()), - result => result, + let result = rows + .map(|row_result| { + let (keyspace_name, table_name, partitioner) = row_result?; + Ok::<_, QueryError>(((keyspace_name, table_name), partitioner)) + }) + .try_collect::>() + .await; + + match result { + // FIXME: This match catches all database errors with this error code despite the fact + // that we are only interested in the ones resulting from non-existent table + // system_schema.scylla_tables. + // For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262 + Err(QueryError::DbError(DbError::Invalid, _)) => Ok(HashMap::new()), + result => result, + } } } diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index 86b529dc08..02ad09eee5 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -10,6 +10,7 @@ mod history; mod hygiene; mod large_batch_statements; mod lwt_optimisation; +mod metadata_custom_timeouts; mod new_session; mod retries; mod self_identity; diff --git a/scylla/tests/integration/metadata_custom_timeouts.rs b/scylla/tests/integration/metadata_custom_timeouts.rs new file mode 100644 index 0000000000..2db08dee8b --- /dev/null +++ b/scylla/tests/integration/metadata_custom_timeouts.rs @@ -0,0 +1,212 @@ +//! Tests that the driver enforces the provided custom metadata fetching timeout +//! iff ScyllaDB is the target node (else ignores the custom timeout). + +use crate::utils::{setup_tracing, test_with_3_node_cluster}; +use scylla::{Session, SessionBuilder}; +use scylla_proxy::{ + Condition, Reaction as _, RequestFrame, RequestOpcode, RequestReaction, RequestRule, + ShardAwareness, +}; +use scylla_proxy::{ProxyError, RunningProxy, WorkerError}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::info; + +// By default, custom metadata request timeout is set to 2 seconds. +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2); + +#[cfg(not(scylla_cloud_tests))] +#[tokio::test] +#[ntest::timeout(20000)] +async fn test_custom_metadata_timeouts() { + setup_tracing(); + + fn expected_clause(dur: Duration) -> String { + format!("USING TIMEOUT {}ms", dur.as_millis()) + } + + fn contains_subslice(slice: &[u8], subslice: &[u8]) -> bool { + slice + .windows(subslice.len()) + .any(|window| window == subslice) + } + + fn check_if_connected_to_scylladb(session: &Session) -> bool { + session + .get_cluster_data() + .get_nodes_info() + .first() + .and_then(|node| node.sharder()) + .is_some() + } + + fn assert_no_custom_timeout(frame: RequestFrame) { + let clause = "USING TIMEOUT"; + assert!( + !contains_subslice(&frame.body, clause.as_bytes()), + "slice {:?} does contain subslice {:?}", + &frame.body, + clause, + ); + } + + fn assert_custom_timeout(frame: RequestFrame, dur: Duration) { + let expected = expected_clause(dur); + assert!( + contains_subslice(&frame.body, expected.as_bytes()), + "slice {:?} does not contain subslice {:?}", + &frame.body, + expected, + ); + } + + fn assert_custom_timeout_iff_scylladb( + frame: RequestFrame, + dur: Duration, + connected_to_scylladb: bool, + ) { + if connected_to_scylladb { + info!( + "Connected to ScyllaDB, so expecting custom timeout to be set to {}ms", + dur.as_millis() + ); + assert_custom_timeout(frame, dur); + } else { + info!("Connected to NOT ScyllaDB, so expecting custom timeout to not be set"); + assert_no_custom_timeout(frame); + } + } + + let res = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, mut running_proxy| async move { + let (feedback_tx, mut feedback_rx) = mpsc::unbounded_channel(); + + // This rule feeds back all QUERY and PREPARE requests that are executed + // on a control connection. + let metadata_query_feedback_rule = RequestRule( + Condition::and( + Condition::ConnectionRegisteredAnyEvent, + Condition::or( + Condition::RequestOpcode(RequestOpcode::Query), + Condition::RequestOpcode(RequestOpcode::Prepare), + ), + ), + RequestReaction::noop().with_feedback_when_performed(feedback_tx), + ); + + async fn check_fedback_messages_with_session( + proxy_uris: [String; 3], + translation_map: HashMap, + rule: RequestRule, + running_proxy: &mut RunningProxy, + feedback_rx: &mut UnboundedReceiver<(RequestFrame, Option)>, + builder_modifier: impl Fn(SessionBuilder) -> SessionBuilder, + check: impl Fn(RequestFrame, bool), + ) { + for node in running_proxy.running_nodes.iter_mut() { + node.change_request_rules(Some(vec![rule.clone()])); + } + + let builder = scylla::SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)); + let builder = builder_modifier(builder); + + let session = builder.build().await.unwrap(); + + let connected_to_scylladb = check_if_connected_to_scylladb(&session); + + // Turn off rules, so that no races are possible about some messages fed + // to the feedback channel after we have already cleared it. + running_proxy.turn_off_rules(); + + fn map_fedback_message<'a, T>( + rx: &'a mut UnboundedReceiver<(RequestFrame, Option)>, + f: impl Fn(RequestFrame) -> T + 'a, + ) -> impl Iterator + 'a { + std::iter::from_fn(move || match rx.try_recv() { + Ok((frame, _)) => Some(f(frame)), + Err(TryRecvError::Disconnected) => { + panic!("feedback tx disconnected unexpectedly") + } + Err(TryRecvError::Empty) => None, + }) + } + + let n_fedback = + map_fedback_message(feedback_rx, |frame| check(frame, connected_to_scylladb)) + .count(); + + info!("Checked {} fedback messages", n_fedback); + } + + // First check - explicitly disabled custom metadata request timeouts. + { + info!("Test case 1: checking for no custom timeout when explicitly disabled"); + check_fedback_messages_with_session( + proxy_uris.clone(), + translation_map.clone(), + metadata_query_feedback_rule.clone(), + &mut running_proxy, + &mut feedback_rx, + |mut builder| { + builder.config.metadata_request_serverside_timeout = None; + builder + }, + |frame, _is_scylladb| assert_no_custom_timeout(frame), + ) + .await; + } + + // Second check - explicitly set custom metadata request timeout. + { + let custom_timeout = Duration::from_millis(2137); + info!("Test case 2: custom timeout explicitly set"); + check_fedback_messages_with_session( + proxy_uris.clone(), + translation_map.clone(), + metadata_query_feedback_rule.clone(), + &mut running_proxy, + &mut feedback_rx, + |builder| builder.metadata_request_serverside_timeout(custom_timeout), + |frame, is_scylladb| { + assert_custom_timeout_iff_scylladb(frame, custom_timeout, is_scylladb) + }, + ) + .await; + } + + // Third check - by default, a custom metadata request timeout is set to some number. + { + info!("Test case 3: custom timeout is set by default"); + check_fedback_messages_with_session( + proxy_uris, + translation_map, + metadata_query_feedback_rule.clone(), + &mut running_proxy, + &mut feedback_rx, + |builder| builder, + |frame, is_scylladb| { + assert_custom_timeout_iff_scylladb(frame, DEFAULT_TIMEOUT, is_scylladb) + }, + ) + .await; + } + + running_proxy + }, + ) + .await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} diff --git a/test/cluster/docker-compose.yml b/test/cluster/docker-compose.yml index 210cc0b738..fbcf4ad45c 100644 --- a/test/cluster/docker-compose.yml +++ b/test/cluster/docker-compose.yml @@ -10,7 +10,7 @@ networks: - subnet: 172.42.0.0/16 services: scylla1: - image: scylladb/scylla + image: scylladb/scylla:6.2 networks: public: ipv4_address: 172.42.0.2 @@ -28,7 +28,7 @@ services: timeout: 5s retries: 60 scylla2: - image: scylladb/scylla + image: scylladb/scylla:6.2 networks: public: ipv4_address: 172.42.0.3 @@ -49,7 +49,7 @@ services: scylla1: condition: service_healthy scylla3: - image: scylladb/scylla + image: scylladb/scylla:6.2 networks: public: ipv4_address: 172.42.0.4 diff --git a/test/tls/scylla.yaml b/test/tls/scylla.yaml index cd36533c4d..2fed5732fe 100644 --- a/test/tls/scylla.yaml +++ b/test/tls/scylla.yaml @@ -20,9 +20,6 @@ # The more tokens, relative to other nodes, the larger the proportion of data # that this node will store. You probably want all nodes to have the same number # of tokens assuming they have equal hardware capability. -# -# If you already have a cluster with 1 token per node, and wish to migrate to -# multiple tokens per node, see http://cassandra.apache.org/doc/latest/operating num_tokens: 256 # Directory where Scylla should store all its files, which are commitlog, @@ -39,6 +36,12 @@ num_tokens: 256 # separate spindle than the data directories. # commitlog_directory: /var/lib/scylla/commitlog +# schema commit log. A special commitlog instance +# used for schema and system tables. +# When running on magnetic HDD, this should be a +# separate spindle than the data directories. +# schema_commitlog_directory: /var/lib/scylla/commitlog/schema + # commitlog_sync may be either "periodic" or "batch." # # When in batch mode, Scylla won't ack writes until the commit log @@ -68,23 +71,35 @@ commitlog_sync_period_in_ms: 10000 # is reasonable. commitlog_segment_size_in_mb: 32 +# The size of the individual schema commitlog file segments. +# +# The default size is 128, which is 4 times larger than the default +# size of the data commitlog. It's because the segment size puts +# a limit on the mutation size that can be written at once, and some +# schema mutation writes are much larger than average. +schema_commitlog_segment_size_in_mb: 128 + # seed_provider class_name is saved for future use. -# seeds address(es) are mandatory! +# A seed address is mandatory. seed_provider: - # Addresses of hosts that are deemed contact points. - # Scylla nodes use this list of hosts to find each other and learn - # the topology of the ring. You must change this if you are running - # multiple nodes! + # The addresses of hosts that will serve as contact points for the joining node. + # It allows the node to discover the cluster ring topology on startup (when + # joining the cluster). + # Once the node has joined the cluster, the seed list has no function. - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - # seeds is actually a comma-delimited list of addresses. - # Ex: ",," + # In a new cluster, provide the address of the first node. + # In an existing cluster, specify the address of at least one existing node. + # If you specify addresses of more than one node, use a comma to separate them. + # For example: ",," - seeds: "127.0.0.1" -# Address or interface to bind to and tell other Scylla nodes to connect to. +# Address to bind to and tell other Scylla nodes to connect to. # You _must_ change this if you want multiple nodes to be able to communicate! # -# Setting listen_address to 0.0.0.0 is always wrong. +# If you leave broadcast_address (below) empty, then setting listen_address +# to 0.0.0.0 is wrong as other nodes will not know how to reach this node. +# If you set broadcast_address, then you can set listen_address to 0.0.0.0. listen_address: localhost # Address to broadcast to other Scylla nodes @@ -99,8 +114,8 @@ listen_address: localhost # listen_on_broadcast_address: false # port for the CQL native transport to listen for clients on -# For security reasons, you should not expose this port to the internet. Firewall it if needed. -# To disable the CQL native transport, set this option to 0. +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# To disable the CQL native transport, remove this option and configure native_transport_port_ssl. native_transport_port: 9042 # Like native_transport_port, but clients are forwarded to specific shards, based on the @@ -114,11 +129,11 @@ native_shard_aware_transport_port: 19042 # for native_transport_port. Setting native_transport_port_ssl to a different value # from native_transport_port will use encryption for native_transport_port_ssl while # keeping native_transport_port unencrypted. -native_transport_port_ssl: 9142 +#native_transport_port_ssl: 9142 # Like native_transport_port_ssl, but clients are forwarded to specific shards, based on the # client-side port numbers. -native_shard_aware_transport_port_ssl: 19142 +#native_shard_aware_transport_port_ssl: 19142 # How long the coordinator should wait for read operations to complete read_request_timeout_in_ms: 5000 @@ -184,8 +199,7 @@ cas_contention_timeout_in_ms: 1000 # of the snitch, which will be assumed to be on your classpath. endpoint_snitch: SimpleSnitch -# The address or interface to bind the Thrift RPC service and native transport -# server to. +# The address or interface to bind the native transport server to. # # Set rpc_address OR rpc_interface, not both. Interfaces must correspond # to a single address, IP aliasing is not supported. @@ -206,21 +220,18 @@ rpc_address: localhost # rpc_interface: eth1 # rpc_interface_prefer_ipv6: false -# port for Thrift to listen for clients on -rpc_port: 9160 - # port for REST API server api_port: 10000 # IP for the REST API server api_address: 127.0.0.1 -# Log WARN on any batch size exceeding this value. 5kb per batch by default. +# Log WARN on any batch size exceeding this value. 128 kiB per batch by default. # Caution should be taken on increasing the size of this threshold as it can lead to node instability. -batch_size_warn_threshold_in_kb: 5 +batch_size_warn_threshold_in_kb: 128 -# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default. -batch_size_fail_threshold_in_kb: 50 +# Fail any multiple-partition batch exceeding this value. 1 MiB (8x warn threshold) by default. +batch_size_fail_threshold_in_kb: 1024 # Authentication backend, identifying users # Out of the box, Scylla provides org.apache.cassandra.auth.{AllowAllAuthenticator, @@ -230,6 +241,9 @@ batch_size_fail_threshold_in_kb: 50 # - PasswordAuthenticator relies on username/password pairs to authenticate # users. It keeps usernames and hashed passwords in system_auth.credentials table. # Please increase system_auth keyspace replication factor if you use this authenticator. +# - com.scylladb.auth.TransitionalAuthenticator requires username/password pair +# to authenticate in the same manner as PasswordAuthenticator, but improper credentials +# result in being logged in as an anonymous user. Use for upgrading clusters' auth. # authenticator: AllowAllAuthenticator # Authorization backend, implementing IAuthorizer; used to limit access/provide permissions @@ -239,11 +253,14 @@ batch_size_fail_threshold_in_kb: 50 # - AllowAllAuthorizer allows any action to any user - set it to disable authorization. # - CassandraAuthorizer stores permissions in system_auth.permissions table. Please # increase system_auth keyspace replication factor if you use this authorizer. +# - com.scylladb.auth.TransitionalAuthorizer wraps around the CassandraAuthorizer, using it for +# authorizing permission management. Otherwise, it allows all. Use for upgrading +# clusters' auth. # authorizer: AllowAllAuthorizer # initial_token allows you to specify tokens manually. While you can use # it with -# vnodes (num_tokens > 1, above) -- in which case you should provide a -# comma-separated list -- it's primarily used when adding nodes # to legacy clusters +# vnodes (num_tokens > 1, above) -- in which case you should provide a +# comma-separated list -- it's primarily used when adding nodes # to legacy clusters # that do not have vnodes enabled. # initial_token: @@ -255,13 +272,14 @@ batch_size_fail_threshold_in_kb: 50 # Uncomment to enable experimental features # experimental_features: -# - cdc -# - lwt # - udf +# - alternator-streams +# - broadcast-tables +# - keyspace-storage-options # The directory where hints files are stored if hinted handoff is enabled. # hints_directory: /var/lib/scylla/hints - + # The directory where hints files are stored for materialized-view updates # view_hints_directory: /var/lib/scylla/view_hints @@ -333,9 +351,6 @@ commitlog_total_space_in_mb: -1 # be rejected as invalid. The default is 256MB. # native_transport_max_frame_size_in_mb: 256 -# Whether to start the thrift rpc server. -# start_rpc: true - # enable or disable keepalive on rpc/native connections # rpc_keepalive: true @@ -352,7 +367,7 @@ commitlog_total_space_in_mb: -1 # snapshot_before_compaction: false # Whether or not a snapshot is taken of the data before keyspace truncation -# or dropping of column families. The STRONGLY advised default of true +# or dropping of column families. The STRONGLY advised default of true # should be used to provide data safety. If you set this flag to false, you will # lose data on truncation or drop. # auto_snapshot: true @@ -380,6 +395,15 @@ commitlog_total_space_in_mb: -1 # you can cache more hot rows # column_index_size_in_kb: 64 +# Auto-scaling of the promoted index prevents running out of memory +# when the promoted index grows too large (due to partitions with many rows +# vs. too small column_index_size_in_kb). When the serialized representation +# of the promoted index grows by this threshold, the desired block size +# for this partition (initialized to column_index_size_in_kb) +# is doubled, to decrease the sampling resolution by half. +# +# To disable promoted index auto-scaling, set the threshold to 0. +# column_index_auto_scale_threshold_in_kb: 10240 # Log a warning when writing partitions larger than this value # compaction_large_partition_warning_threshold_mb: 1000 @@ -393,6 +417,9 @@ commitlog_total_space_in_mb: -1 # Log a warning when row number is larger than this value # compaction_rows_count_warning_threshold: 100000 +# Log a warning when writing a collection containing more elements than this value +# compaction_collection_elements_count_warning_threshold: 10000 + # How long the coordinator should wait for seq or index scans to complete # range_request_timeout_in_ms: 10000 # How long the coordinator should wait for writes to complete @@ -407,39 +434,41 @@ commitlog_total_space_in_mb: -1 # The default timeout for other, miscellaneous operations # request_timeout_in_ms: 10000 -# Enable or disable inter-node encryption. -# You must also generate keys and provide the appropriate key and trust store locations and passwords. +# Enable or disable inter-node encryption. +# You must also generate keys and provide the appropriate key and trust store locations and passwords. # # The available internode options are : all, none, dc, rack # If set to dc scylla will encrypt the traffic between the DCs # If set to rack scylla will encrypt the traffic between the racks # -# SSL/TLS algorithm and ciphers used can be controlled by +# SSL/TLS algorithm and ciphers used can be controlled by # the priority_string parameter. Info on priority string # syntax and values is available at: # https://gnutls.org/manual/html_node/Priority-Strings.html # -# The require_client_auth parameter allows you to -# restrict access to service based on certificate -# validation. Client must provide a certificate +# The require_client_auth parameter allows you to +# restrict access to service based on certificate +# validation. Client must provide a certificate # accepted by the used trust store to connect. -# +# # server_encryption_options: # internode_encryption: none # certificate: conf/scylla.crt # keyfile: conf/scylla.key -# truststore: +# truststore: +# certficate_revocation_list: # require_client_auth: False -# priority_string: +# priority_string: # enable or disable client/server encryption. -client_encryption_options: - enabled: true - certificate: /etc/scylla/db.crt - keyfile: /etc/scylla/db.key - # truststore: /etc/scylla/cadb.pem +# client_encryption_options: +# enabled: false +# certificate: conf/scylla.crt +# keyfile: conf/scylla.key +# truststore: +# certficate_revocation_list: # require_client_auth: False -# priority_string: +# priority_string: # internode_compression controls whether traffic between nodes is # compressed. @@ -482,9 +511,13 @@ client_encryption_options: # prometheus_port: 9180 # # prometheus address -# By default, Scylla binds all interfaces to the prometheus API -# It is possible to restrict the listening address to a specific one -# prometheus_address: 0.0.0.0 +# Leaving this blank will set it to the same value as listen_address. +# This means that by default, Scylla listens to the prometheus API on the same +# listening address (and therefore network interface) used to listen for +# internal communication. If the monitoring node is not in this internal +# network, you can override prometheus_address explicitly - e.g., setting +# it to 0.0.0.0 to listen on all interfaces. +# prometheus_address: 1.2.3.4 # Distribution of data among cores (shards) within a node # @@ -505,5 +538,89 @@ client_encryption_options: # # Keep at 12 for new clusters. murmur3_partitioner_ignore_msb_bits: 12 + +# Use on a new, parallel algorithm for performing aggregate queries. +# Set to `false` to fall-back to the old algorithm. +# enable_parallelized_aggregation: true + +# Time for which task manager task is kept in memory after it completes. +# task_ttl_in_seconds: 0 + +# In materialized views, restrictions are allowed only on the view's primary key columns. +# In old versions Scylla mistakenly allowed IS NOT NULL restrictions on columns which were not part +# of the view's primary key. These invalid restrictions were ignored. +# This option controls the behavior when someone tries to create a view with such invalid IS NOT NULL restrictions. +# +# Can be true, false, or warn. +# * `true`: IS NOT NULL is allowed only on the view's primary key columns, +# trying to use it on other columns will cause an error, as it should. +# * `false`: Scylla accepts IS NOT NULL restrictions on regular columns, but they're silently ignored. +# It's useful for backwards compatibility. +# * `warn`: The same as false, but there's a warning about invalid view restrictions. +# +# To preserve backwards compatibility on old clusters, Scylla's default setting is `warn`. +# New clusters have this option set to `true` by scylla.yaml (which overrides the default `warn`) +# to make sure that trying to create an invalid view causes an error. +strict_is_not_null_in_views: true + +# The Unix Domain Socket the node uses for maintenance socket. +# The possible options are: +# * ignore: the node will not open the maintenance socket, +# * workdir: the node will open the maintenance socket on the path /cql.m, +# where is a path defined by the workdir configuration option, +# * : the node will open the maintenance socket on the path . +maintenance_socket: ignore + +# If set to true, configuration parameters defined with LiveUpdate option can be updated in runtime with CQL +# by updating system.config virtual table. If we don't want any configuration parameter to be changed in runtime +# via CQL, this option should be set to false. This parameter doesn't impose any limits on other mechanisms updating +# configuration parameters in runtime, e.g. sending SIGHUP or using API. This option should be set to false +# e.g. for cloud users, for whom scylla's configuration should be changed only by support engineers. +# live_updatable_config_params_changeable_via_cql: true + +# **************** +# * GUARDRAILS * +# **************** + +# Guardrails to warn or fail when Replication Factor is smaller/greater than the threshold. +# Please note that the value of 0 is always allowed, +# which means that having no replication at all, i.e. RF = 0, is always valid. +# A guardrail value smaller than 0, e.g. -1, means that the guardrail is disabled. +# Commenting out a guardrail also means it is disabled. +# minimum_replication_factor_fail_threshold: -1 +# minimum_replication_factor_warn_threshold: 3 +# maximum_replication_factor_warn_threshold: -1 +# maximum_replication_factor_fail_threshold: -1 + +# Guardrails to warn about or disallow creating a keyspace with specific replication strategy. +# Each of these 2 settings is a list storing replication strategies considered harmful. +# The replication strategies to choose from are: +# 1) SimpleStrategy, +# 2) NetworkTopologyStrategy, +# 3) LocalStrategy, +# 4) EverywhereStrategy +# +# replication_strategy_warn_list: +# - SimpleStrategy +# replication_strategy_fail_list: + +# Enables the tablets feature. +# When enabled, newly created keyspaces will have tablets enabled by default. +# That can be explicitly disabled in the CREATE KEYSPACE query +# by using the `tablets = {'enabled': false}` replication option. +# +# When the tablets feature is disabled, there is no way to enable tablets +# per keyspace. +# +# Note that creating keyspaces with tablets enabled is irreversible. +# Disabling the tablets feature may impact existing keyspaces that were created with tablets. +# For example, the tablets map would remain "frozen" and will not respond to topology changes +# like adding, removing, or replacing nodes, or to replication factor changes. +enable_tablets: true api_ui_dir: /opt/scylladb/swagger-ui/dist/ api_doc_dir: /opt/scylladb/api/api-doc/ + +client_encryption_options: + enabled: true + certificate: /etc/scylla/db.crt + keyfile: /etc/scylla/db.key