Skip to content

Commit

Permalink
Merge branch 'scylladb:main' into 330-make-metrics-collection-optiona…
Browse files Browse the repository at this point in the history
…l/faster
  • Loading branch information
NikodemGapski authored Jan 16, 2025
2 parents b36ef1f + f9f0940 commit 0bcde8e
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 51 deletions.
4 changes: 2 additions & 2 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ where
let retry_decision = self.retry_session.decide_should_retry(query_info);
trace!(
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
retry_decision = ?retry_decision
);

last_error = request_error.into_query_error();
Expand Down Expand Up @@ -866,7 +866,7 @@ impl QueryPager {
serialized_values_size,
);
if let Some(replicas) = replicas.as_ref() {
span.record_replicas(replicas);
span.record_replicas(replicas.iter().map(|(node, shard)| (node, *shard)));
}
span
};
Expand Down
8 changes: 3 additions & 5 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,10 +1455,8 @@ where
if !span.span().is_disabled() {
if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
let cluster_data = self.get_cluster_data();
let replicas: smallvec::SmallVec<[_; 8]> = cluster_data
.get_token_endpoints_iter(table_spec, token)
.collect();
span.record_replicas(&replicas)
let replicas = cluster_data.get_token_endpoints_iter(table_spec, token);
span.record_replicas(replicas)
}
}

Expand Down Expand Up @@ -2133,7 +2131,7 @@ where
let retry_decision = context.retry_session.decide_should_retry(query_info);
trace!(
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
retry_decision = ?retry_decision
);

last_error = Some(request_error.into_query_error());
Expand Down
31 changes: 12 additions & 19 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::policies::host_filter::HostFilter;
use crate::routing::Token;
use crate::statement::query::Query;
use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState};
use crate::utils::pretty::{CommaSeparatedDisplayer, DisplayUsingDebug};

use futures::future::{self, FutureExt};
use futures::stream::{self, StreamExt, TryStreamExt};
Expand All @@ -37,8 +38,7 @@ use scylla_macros::DeserializeRow;
use std::borrow::BorrowMut;
use std::cell::Cell;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::fmt::{self, Formatter};
use std::net::{IpAddr, SocketAddr};
use std::num::NonZeroUsize;
use std::str::FromStr;
Expand Down Expand Up @@ -562,11 +562,7 @@ impl MetadataReader {
self.known_peers.shuffle(&mut thread_rng());
debug!(
"Known peers: {}",
self.known_peers
.iter()
.map(|endpoint| format!("{:?}", endpoint))
.collect::<Vec<String>>()
.join(", ")
CommaSeparatedDisplayer(self.known_peers.iter().map(DisplayUsingDebug))
);

let address_of_failed_control_connection = self.control_connection_endpoint.address();
Expand Down Expand Up @@ -633,11 +629,9 @@ impl MetadataReader {
};

warn!(
control_connection_address = self
control_connection_address = tracing::field::display(self
.control_connection_endpoint
.address()
.to_string()
.as_str(),
.address()),
error = %err,
"Failed to fetch metadata using current control connection"
);
Expand Down Expand Up @@ -701,11 +695,9 @@ impl MetadataReader {
// and print an error message about this fact
if !metadata.peers.is_empty() && self.known_peers.is_empty() {
error!(
node_ips = ?metadata
.peers
.iter()
.map(|peer| peer.address)
.collect::<Vec<_>>(),
node_ips = tracing::field::display(CommaSeparatedDisplayer(
metadata.peers.iter().map(|peer| peer.address)
)),
"The host filter rejected all nodes in the cluster, \
no connections that can serve user queries have been \
established. The session cannot serve any queries!"
Expand All @@ -721,12 +713,12 @@ impl MetadataReader {
if let Some(peer) = control_connection_peer {
if !self.host_filter.as_ref().map_or(true, |f| f.accept(peer)) {
warn!(
filtered_node_ips = ?metadata
filtered_node_ips = tracing::field::display(CommaSeparatedDisplayer(metadata
.peers
.iter()
.filter(|peer| self.host_filter.as_ref().map_or(true, |p| p.accept(peer)))
.map(|peer| peer.address)
.collect::<Vec<_>>(),
)),
control_connection_address = ?self.control_connection_endpoint.address(),
"The node that the control connection is established to \
is not accepted by the host filter. Please verify that \
Expand Down Expand Up @@ -889,9 +881,10 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe

let peers = translated_peers_futures
.buffer_unordered(256)
.try_filter_map(|x| std::future::ready(Ok(x)))
.try_collect::<Vec<_>>()
.await?;
Ok(peers.into_iter().flatten().collect())
Ok(peers)
}

async fn create_peer_from_row(
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl ClusterState {
&self,
table_spec: &TableSpec,
token: Token,
) -> impl Iterator<Item = (NodeRef<'_>, Shard)> {
) -> impl Iterator<Item = (NodeRef<'_>, Shard)> + Clone {
let keyspace = self.keyspaces.get(table_spec.ks_name());
let strategy = keyspace
.map(|k| &k.strategy)
Expand Down
12 changes: 5 additions & 7 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(feature = "cloud")]
use crate::cloud::set_ssl_config_for_scylla_cloud_host;
use crate::utils::pretty::CommaSeparatedDisplayer;

use super::connection::{
open_connection, open_connection_to_shard_aware_port, Connection, ConnectionConfig,
Expand Down Expand Up @@ -369,12 +370,9 @@ impl NodeConnectionPool {

fn choose_random_connection_from_slice(v: &[Arc<Connection>]) -> Option<Arc<Connection>> {
trace!(
connections = v
.iter()
.map(|conn| conn.get_connect_address().to_string())
.collect::<Vec<String>>()
.join(",")
.as_str(),
connections = tracing::field::display(CommaSeparatedDisplayer(
v.iter().map(|conn| conn.get_connect_address())
)),
"Available"
);
if v.is_empty() {
Expand Down Expand Up @@ -598,7 +596,7 @@ impl PoolRefiller {
}
}
trace!(
pool_state = format!("{:?}", ShardedConnectionVectorWrapper(&self.conns)).as_str()
pool_state = ?ShardedConnectionVectorWrapper(&self.conns)
);

// Schedule refilling here
Expand Down
28 changes: 14 additions & 14 deletions scylla/src/observability/driver_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,26 @@ impl RequestSpan {
}
}

pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [(impl Borrow<Arc<Node>>, Shard)]) {
struct ReplicaIps<'a, N>(&'a [(N, Shard)]);
impl<N> Display for ReplicaIps<'_, N>
pub(crate) fn record_replicas<'a>(
&'a self,
replicas: impl Iterator<Item = (impl Borrow<Arc<Node>> + 'a, Shard)> + Clone,
) {
struct Replica<N>(N, Shard);
impl<N> Display for Replica<N>
where
N: Borrow<Arc<Node>>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut nodes_with_shards = self.0.iter();
if let Some((node, shard)) = nodes_with_shards.next() {
write!(f, "{}-shard{}", node.borrow().address.ip(), shard)?;

for (node, shard) in nodes_with_shards {
write!(f, ",{}-shard{}", node.borrow().address.ip(), shard)?;
}
}
Ok(())
let Self(node, shard) = self;
write!(f, "{}-shard{}", node.borrow().address.ip(), shard)
}
}
self.span
.record("replicas", tracing::field::display(&ReplicaIps(replicas)));
self.span.record(
"replicas",
tracing::field::display(CommaSeparatedDisplayer(
replicas.map(|(node, shard)| Replica(node, shard)),
)),
);
}

pub(crate) fn record_request_size(&self, size: usize) {
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/routing/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
}
}

#[derive(Clone)]
enum ReplicaSetIteratorInner<'a> {
/// Token ring with SimpleStrategy, any datacenter
Plain {
Expand Down Expand Up @@ -502,6 +503,7 @@ enum ReplicaSetIteratorInner<'a> {
}

/// Iterator that returns replicas from some replica set.
#[derive(Clone)]
pub struct ReplicaSetIterator<'a> {
inner: ReplicaSetIteratorInner<'a>,
token: Token,
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/routing/locator/replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{ops::Index, sync::Arc};
///
/// This type is very similar to `Cow`, but unlike `Cow`,
/// it holds references in an `Owned` variant `Vec`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ReplicasArray<'a> {
Borrowed(&'a [Arc<Node>]),
Owned(Vec<NodeRef<'a>>),
Expand Down
5 changes: 3 additions & 2 deletions scylla/src/routing/locator/replication_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ impl ReplicationInfo {

let unique_nodes_in_global_ring = global_ring
.iter()
.map(|(_t, n)| n.clone())
.map(|(_t, n)| n)
.unique()
.cloned()
.collect();

let mut datacenter_nodes: HashMap<&str, Vec<(Token, Arc<Node>)>> = HashMap::new();
Expand All @@ -82,7 +83,7 @@ impl ReplicationInfo {
for (datacenter_name, this_datacenter_nodes) in datacenter_nodes {
let dc_ring = TokenRing::new(this_datacenter_nodes.into_iter());
let unique_nodes_in_dc_ring =
dc_ring.iter().map(|(_t, n)| n.clone()).unique().collect();
dc_ring.iter().map(|(_t, n)| n).unique().cloned().collect();
// When counting racks consider None as a separate rack
let rack_count: usize = dc_ring
.iter()
Expand Down
7 changes: 7 additions & 0 deletions scylla/src/utils/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ where
}
}

pub(crate) struct DisplayUsingDebug<T>(pub(crate) T);
impl<T: std::fmt::Debug> std::fmt::Display for DisplayUsingDebug<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<T as std::fmt::Debug>::fmt(&self.0, f)
}
}

#[cfg(test)]
mod tests {
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
Expand Down

0 comments on commit 0bcde8e

Please sign in to comment.