Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errors: narrow error types accepted by HistoryListener #1159

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
48 changes: 24 additions & 24 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::cluster::{ClusterState, NodeRef};
#[allow(deprecated)]
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::ProtocolError;
use crate::errors::{ProtocolError, RequestError};
use crate::errors::{QueryError, RequestAttemptError};
use crate::frame::response::result;
use crate::network::Connection;
Expand Down Expand Up @@ -148,7 +148,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
paging_state: PagingState,

history_listener: Option<Arc<dyn HistoryListener>>,
current_query_id: Option<history::QueryId>,
current_request_id: Option<history::RequestId>,
current_attempt_id: Option<history::AttemptId>,

parent_span: tracing::Span,
Expand All @@ -168,10 +168,10 @@ where
let query_plan =
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);

let mut last_error: QueryError = QueryError::EmptyPlan;
let mut last_error: RequestError = RequestError::EmptyPlan;
let mut current_consistency: Consistency = self.query_consistency;

self.log_query_start();
self.log_request_start();

'nodes_in_plan: for (node, shard) in query_plan {
let span =
Expand Down Expand Up @@ -235,8 +235,9 @@ where
retry_decision = ?retry_decision
);

last_error = request_error.into_query_error();
self.log_attempt_error(&last_error, &retry_decision);
self.log_attempt_error(&request_error, &retry_decision);

last_error = request_error.into();

match retry_decision {
RetryDecision::RetrySameNode(cl) => {
Expand Down Expand Up @@ -265,9 +266,8 @@ where
}
}

// Send last_error to QueryPager - query failed fully
self.log_query_error(&last_error);
let (proof, _) = self.sender.send(Err(last_error)).await;
self.log_request_error(&last_error);
let (proof, _) = self.sender.send(Err(last_error.into_query_error())).await;
proof
}

Expand Down Expand Up @@ -329,7 +329,7 @@ where
}) => {
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
self.log_request_success();
self.execution_profile
.load_balancing_policy
.on_request_success(&self.statement_info, elapsed, node);
Expand Down Expand Up @@ -357,7 +357,7 @@ where

// Query succeeded, reset retry policy for future retries
self.retry_session.reset();
self.log_query_start();
self.log_request_start();

Ok(ControlFlow::Continue(()))
}
Expand Down Expand Up @@ -392,41 +392,41 @@ where
}
}

fn log_query_start(&mut self) {
fn log_request_start(&mut self) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
};

self.current_query_id = Some(history_listener.log_query_start());
self.current_request_id = Some(history_listener.log_request_start());
}

fn log_query_success(&mut self) {
fn log_request_success(&mut self) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
};

let query_id: history::QueryId = match &self.current_query_id {
let request_id: history::RequestId = match &self.current_request_id {
Some(id) => *id,
None => return,
};

history_listener.log_query_success(query_id);
history_listener.log_request_success(request_id);
}

fn log_query_error(&mut self, error: &QueryError) {
fn log_request_error(&mut self, error: &RequestError) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
};

let query_id: history::QueryId = match &self.current_query_id {
let request_id: history::RequestId = match &self.current_request_id {
Some(id) => *id,
None => return,
};

history_listener.log_query_error(query_id, error);
history_listener.log_request_error(request_id, error);
}

fn log_attempt_start(&mut self, node_addr: SocketAddr) {
Expand All @@ -435,13 +435,13 @@ where
None => return,
};

let query_id: history::QueryId = match &self.current_query_id {
let request_id: history::RequestId = match &self.current_request_id {
Some(id) => *id,
None => return,
};

self.current_attempt_id =
Some(history_listener.log_attempt_start(query_id, None, node_addr));
Some(history_listener.log_attempt_start(request_id, None, node_addr));
}

fn log_attempt_success(&mut self) {
Expand All @@ -458,7 +458,7 @@ where
history_listener.log_attempt_success(attempt_id);
}

fn log_attempt_error(&mut self, error: &QueryError, retry_decision: &RetryDecision) {
fn log_attempt_error(&mut self, error: &RequestAttemptError, retry_decision: &RetryDecision) {
let history_listener: &dyn HistoryListener = match &self.history_listener {
Some(hl) => &**hl,
None => return,
Expand Down Expand Up @@ -754,7 +754,7 @@ impl QueryPager {
metrics,
paging_state: PagingState::start(),
history_listener: query.config.history_listener.clone(),
current_query_id: None,
current_request_id: None,
current_attempt_id: None,
parent_span,
span_creator,
Expand Down Expand Up @@ -872,7 +872,7 @@ impl QueryPager {
metrics: config.metrics,
paging_state: PagingState::start(),
history_listener: config.prepared.config.history_listener.clone(),
current_query_id: None,
current_request_id: None,
current_attempt_id: None,
parent_span,
span_creator,
Expand Down
70 changes: 32 additions & 38 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use crate::cluster::node::CloudEndpoint;
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, RequestError,
TracingProtocolError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand Down Expand Up @@ -1849,11 +1850,11 @@ where
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
statement_config
.history_listener
.as_ref()
.map(|hl| (&**hl, hl.log_query_start()));
.map(|hl| (&**hl, hl.log_request_start()));

let load_balancer = &execution_profile.load_balancing_policy;

Expand Down Expand Up @@ -1899,16 +1900,18 @@ where
let request_runner_generator = |is_speculative: bool| {
let history_data: Option<HistoryData> = history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| {
.map(|(history_listener, request_id)| {
let speculative_id: Option<history::SpeculativeId> =
if is_speculative {
Some(history_listener.log_new_speculative_fiber(*query_id))
Some(
history_listener.log_new_speculative_fiber(*request_id),
)
} else {
None
};
HistoryData {
listener: *history_listener,
query_id: *query_id,
request_id: *request_id,
speculative_id,
}
});
Expand Down Expand Up @@ -1947,9 +1950,9 @@ where
let history_data: Option<HistoryData> =
history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| HistoryData {
.map(|(history_listener, request_id)| HistoryData {
listener: *history_listener,
query_id: *query_id,
request_id: *request_id,
speculative_id: None,
});
self.run_request_speculative_fiber(
Expand All @@ -1966,7 +1969,6 @@ where
},
)
.await
.unwrap_or(Err(QueryError::EmptyPlan))
}
}
};
Expand All @@ -1977,24 +1979,19 @@ where
let result = match effective_timeout {
Some(timeout) => tokio::time::timeout(timeout, runner)
.await
.unwrap_or_else(|e| {
Err(QueryError::RequestTimeout(format!(
"Request took longer than {}ms: {}",
timeout.as_millis(),
e
)))
}),
None => runner.await,
.map(|res| res.map_err(RequestError::from))
.unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))),
None => runner.await.map_err(RequestError::from),
};

if let Some((history_listener, query_id)) = history_listener_and_id {
if let Some((history_listener, request_id)) = history_listener_and_id {
match &result {
Ok(_) => history_listener.log_query_success(query_id),
Err(e) => history_listener.log_query_error(query_id, e),
Ok(_) => history_listener.log_request_success(request_id),
Err(e) => history_listener.log_request_error(request_id, e),
}
}

result
result.map_err(RequestError::into_query_error)
}

/// Executes the closure `run_request_once`, provided the load balancing plan and some information
Expand All @@ -2008,12 +2005,12 @@ where
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
execution_profile: &ExecutionProfileInner,
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
) -> Result<RunRequestResult<ResT>, RequestError>
where
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
let mut last_error: RequestError = RequestError::EmptyPlan;
let mut current_consistency: Consistency = context
.consistency_set_on_statement
.unwrap_or(execution_profile.consistency);
Expand All @@ -2030,7 +2027,7 @@ where
error = %e,
"Choosing connection failed"
);
last_error = Some(e.into());
last_error = e.into();
// Broken connection doesn't count as a failed request, don't log in metrics
continue 'nodes_in_plan;
}
Expand Down Expand Up @@ -2063,7 +2060,7 @@ where
elapsed,
node,
);
return Some(Ok(RunRequestResult::Completed(response)));
return Ok(RunRequestResult::Completed(response));
}
Err(e) => {
trace!(
Expand Down Expand Up @@ -2097,12 +2094,9 @@ where
retry_decision = ?retry_decision
);

last_error = Some(request_error.into_query_error());
context.log_attempt_error(
&attempt_id,
last_error.as_ref().unwrap(),
&retry_decision,
);
context.log_attempt_error(&attempt_id, &request_error, &retry_decision);

last_error = request_error.into();

match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
Expand All @@ -2118,13 +2112,13 @@ where
RetryDecision::DontRetry => break 'nodes_in_plan,

RetryDecision::IgnoreWriteError => {
return Some(Ok(RunRequestResult::IgnoredWriteError))
return Ok(RunRequestResult::IgnoredWriteError)
}
};
}
}

last_error.map(Result::Err)
Err(last_error)
}

async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, QueryError> {
Expand All @@ -2142,8 +2136,8 @@ where
self.await_schema_agreement_indefinitely(),
)
.await
.unwrap_or(Err(QueryError::RequestTimeout(
"schema agreement not reached in time".to_owned(),
.unwrap_or(Err(QueryError::SchemaAgreementTimeout(
self.schema_agreement_timeout,
)))
}

Expand Down Expand Up @@ -2192,15 +2186,15 @@ struct ExecuteRequestContext<'a> {

struct HistoryData<'a> {
listener: &'a dyn HistoryListener,
query_id: history::QueryId,
request_id: history::RequestId,
speculative_id: Option<history::SpeculativeId>,
}

impl ExecuteRequestContext<'_> {
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
self.history_data.as_ref().map(|hd| {
hd.listener
.log_attempt_start(hd.query_id, hd.speculative_id, node_addr)
.log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
})
}

Expand All @@ -2221,7 +2215,7 @@ impl ExecuteRequestContext<'_> {
fn log_attempt_error(
&self,
attempt_id_opt: &Option<history::AttemptId>,
error: &QueryError,
error: &RequestAttemptError,
retry_decision: &RetryDecision,
) {
let attempt_id: &history::AttemptId = match attempt_id_opt {
Expand Down
Loading
Loading