Skip to content

Commit

Permalink
Merge pull request #1952 from tursodatabase/sqld-metrics
Browse files Browse the repository at this point in the history
add few simple metrics to sqld
  • Loading branch information
sivukhin authored Feb 12, 2025
2 parents 224b57e + c921508 commit 701a8cf
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 23 deletions.
4 changes: 4 additions & 0 deletions libsql-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub struct DbConfig {
pub snapshot_at_shutdown: bool,
pub encryption_config: Option<EncryptionConfig>,
pub max_concurrent_requests: u64,
pub disable_intelligent_throttling: bool,
pub connection_creation_timeout: Option<Duration>,
}

impl Default for DbConfig {
Expand All @@ -119,6 +121,8 @@ impl Default for DbConfig {
snapshot_at_shutdown: false,
encryption_config: None,
max_concurrent_requests: 128,
disable_intelligent_throttling: false,
connection_creation_timeout: None,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ use crate::error::Error;
use crate::http::user::timing::sample_time;
use crate::metrics::{
CONCURRENT_CONNECTIONS_COUNT, CONNECTION_ALIVE_DURATION, CONNECTION_CREATE_TIME,
TOTAL_RESPONSE_SIZE_HIST,
};
use crate::namespace::meta_store::MetaStore;
use crate::namespace::NamespaceName;
use crate::query::{Params, Query};
use crate::query_analysis::Statement;
use crate::query_result_builder::{IgnoreResult, QueryResultBuilder};
use crate::query_result_builder::{IgnoreResult, QueryResultBuilder, TOTAL_RESPONSE_SIZE};
use crate::replication::FrameNo;
use crate::Result;

Expand Down Expand Up @@ -205,6 +206,7 @@ pub trait MakeConnection: Send + Sync + 'static {
timeout: Option<Duration>,
max_total_response_size: u64,
max_concurrent_requests: u64,
disable_intelligent_throttling: bool,
) -> MakeThrottledConnection<Self>
where
Self: Sized,
Expand All @@ -215,6 +217,7 @@ pub trait MakeConnection: Send + Sync + 'static {
timeout,
max_total_response_size,
max_concurrent_requests,
disable_intelligent_throttling,
)
}

Expand Down Expand Up @@ -280,6 +283,7 @@ pub struct MakeThrottledConnection<F> {
max_total_response_size: u64,
waiters: AtomicUsize,
max_concurrent_requests: u64,
disable_intelligent_throttling: bool,
}

impl<F> MakeThrottledConnection<F> {
Expand All @@ -289,6 +293,7 @@ impl<F> MakeThrottledConnection<F> {
timeout: Option<Duration>,
max_total_response_size: u64,
max_concurrent_requests: u64,
disable_intelligent_throttling: bool,
) -> Self {
Self {
semaphore,
Expand All @@ -297,12 +302,16 @@ impl<F> MakeThrottledConnection<F> {
max_total_response_size,
waiters: AtomicUsize::new(0),
max_concurrent_requests,
disable_intelligent_throttling,
}
}

// How many units should be acquired from the semaphore,
// depending on current memory pressure.
fn units_to_take(&self) -> u32 {
if self.disable_intelligent_throttling {
return 1;
}
let total_response_size = crate::query_result_builder::TOTAL_RESPONSE_SIZE
.load(std::sync::atomic::Ordering::Relaxed) as u64;
if total_response_size * 2 > self.max_total_response_size {
Expand Down Expand Up @@ -352,6 +361,8 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
"Available semaphore units: {}",
self.semaphore.available_permits()
);
TOTAL_RESPONSE_SIZE_HIST
.record(TOTAL_RESPONSE_SIZE.load(std::sync::atomic::Ordering::Relaxed) as f64);
let units = self.units_to_take();
let waiters_guard = WaitersGuard::new(&self.waiters);
if (waiters_guard.waiters.load(Ordering::Relaxed) as u64) >= self.max_concurrent_requests {
Expand Down Expand Up @@ -519,6 +530,7 @@ pub mod test {
Some(Duration::from_millis(100)),
u64::MAX,
u64::MAX,
false,
);

let mut conns = Vec::with_capacity(10);
Expand Down
5 changes: 5 additions & 0 deletions libsql-server/src/hrana/http/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::time::{Duration, Instant};

use crate::connection::MakeConnection;
use crate::database::Connection;
use crate::metrics::STREAM_HANDLES_COUNT;

use super::super::ProtocolError;
use super::Server;
Expand Down Expand Up @@ -169,6 +170,8 @@ pub async fn acquire<'srv>(
baton_seq: rand::random(),
});
state.handles.insert(stream.stream_id, Handle::Acquired);
STREAM_HANDLES_COUNT.increment(1.0);

tracing::debug!(
"Stream {} was created with baton seq {}",
stream.stream_id,
Expand Down Expand Up @@ -253,6 +256,7 @@ impl<'srv> Drop for Guard<'srv> {
tracing::debug!("Stream {stream_id} was released for further use");
} else {
tracing::debug!("Stream {stream_id} was closed");
STREAM_HANDLES_COUNT.decrement(1.0);
}
}
}
Expand Down Expand Up @@ -374,6 +378,7 @@ fn pump_expire(state: &mut ServerStreamState, cx: &mut task::Context) {

match state.handles.get_mut(&stream_id) {
Some(handle @ Handle::Available(_)) => {
STREAM_HANDLES_COUNT.decrement(1.0);
*handle = Handle::Expired;
}
_ => continue,
Expand Down
44 changes: 26 additions & 18 deletions libsql-server/src/hrana/stmt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail, Result};
use metrics::counter;
use std::collections::HashMap;

use super::result_builder::SingleStatementBuilder;
Expand Down Expand Up @@ -206,39 +207,46 @@ fn catch_stmt_error(sqld_error: SqldError) -> anyhow::Error {
}

pub fn stmt_error_from_sqld_error(sqld_error: SqldError) -> Result<StmtError, SqldError> {
Ok(match sqld_error {
SqldError::LibSqlInvalidQueryParams(source) => StmtError::ArgsInvalid { source },
SqldError::LibSqlTxTimeout => StmtError::TransactionTimeout,
SqldError::LibSqlTxBusy => StmtError::TransactionBusy,
let result = match sqld_error {
SqldError::LibSqlInvalidQueryParams(source) => Ok(StmtError::ArgsInvalid { source }),
SqldError::LibSqlTxTimeout => Ok(StmtError::TransactionTimeout),
SqldError::LibSqlTxBusy => Ok(StmtError::TransactionBusy),
SqldError::BuilderError(QueryResultBuilderError::ResponseTooLarge(_)) => {
StmtError::ResponseTooLarge
Ok(StmtError::ResponseTooLarge)
}
SqldError::Blocked(reason) => StmtError::Blocked { reason },
SqldError::RpcQueryError(e) => StmtError::Proxy(e.message),
SqldError::Blocked(reason) => Ok(StmtError::Blocked { reason }),
SqldError::RpcQueryError(e) => Ok(StmtError::Proxy(e.message)),
SqldError::RusqliteError(rusqlite_error)
| SqldError::RusqliteErrorExtended(rusqlite_error, _) => match rusqlite_error {
rusqlite::Error::SqliteFailure(sqlite_error, Some(message)) => StmtError::SqliteError {
source: sqlite_error,
message,
},
rusqlite::Error::SqliteFailure(sqlite_error, None) => StmtError::SqliteError {
rusqlite::Error::SqliteFailure(sqlite_error, Some(message)) => {
Ok(StmtError::SqliteError {
source: sqlite_error,
message,
})
}
rusqlite::Error::SqliteFailure(sqlite_error, None) => Ok(StmtError::SqliteError {
message: sqlite_error.to_string(),
source: sqlite_error,
},
}),
rusqlite::Error::SqlInputError {
error: sqlite_error,
msg: message,
offset,
..
} => StmtError::SqlInputError {
} => Ok(StmtError::SqlInputError {
source: sqlite_error,
message,
offset,
},
rusqlite_error => return Err(SqldError::RusqliteError(rusqlite_error)),
}),
rusqlite_error => Err(SqldError::RusqliteError(rusqlite_error)),
},
sqld_error => return Err(sqld_error),
})
sqld_error => Err(sqld_error),
};

let code = result.as_ref().map(|x| x.code()).unwrap_or("UKNOWN");
counter!("libsql_server_hrana_step_errors", 1, "code" => code);

result
}

pub fn proto_error_from_stmt_error(error: &StmtError) -> hrana::proto::Error {
Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ where
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
max_concurrent_requests: self.db_config.max_concurrent_requests,
encryption_config: self.db_config.encryption_config.clone(),
disable_intelligent_throttling: self.db_config.disable_intelligent_throttling,
connection_creation_timeout: self.db_config.connection_creation_timeout,
};

let (metastore_conn_maker, meta_store_wal_manager) =
Expand Down
11 changes: 11 additions & 0 deletions libsql-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ struct Cli {
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_REQUESTS")]
max_concurrent_requests: u64,

// disable throttling logic which adjust concurrency limits based on memory-pressure conditions
#[clap(long, env = "SQLD_DISABLE_INTELLIGENT_THROTTLING")]
disable_intelligent_throttling: bool,

#[clap(long, env = "SQLD_CONNECTION_CREATION_TIMEOUT_SEC")]
connection_creation_timeout_sec: Option<u64>,

/// Allow meta store to recover config from filesystem from older version, if meta store is
/// empty on startup
#[clap(long, env = "SQLD_ALLOW_METASTORE_RECOVERY")]
Expand Down Expand Up @@ -421,6 +428,10 @@ fn make_db_config(config: &Cli) -> anyhow::Result<DbConfig> {
snapshot_at_shutdown: config.snapshot_at_shutdown,
encryption_config: encryption_config.clone(),
max_concurrent_requests: config.max_concurrent_requests,
disable_intelligent_throttling: config.disable_intelligent_throttling,
connection_creation_timeout: config
.connection_creation_timeout_sec
.map(|x| Duration::from_secs(x)),
})
}

Expand Down
10 changes: 10 additions & 0 deletions libsql-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ pub static CONCURRENT_CONNECTIONS_COUNT: Lazy<Gauge> = Lazy::new(|| {
describe_gauge!(NAME, "number of concurrent connections");
register_gauge!(NAME)
});
pub static TOTAL_RESPONSE_SIZE_HIST: Lazy<Histogram> = Lazy::new(|| {
const NAME: &str = "libsql_server_total_response_size_before_lock";
describe_histogram!(NAME, "total response size value before connection lock");
register_histogram!(NAME)
});
pub static STREAM_HANDLES_COUNT: Lazy<Gauge> = Lazy::new(|| {
const NAME: &str = "libsql_server_stream_handles";
describe_gauge!(NAME, "amount of in-memory stream handles");
register_gauge!(NAME)
});
pub static NAMESPACE_LOAD_LATENCY: Lazy<Histogram> = Lazy::new(|| {
const NAME: &str = "libsql_server_namespace_load_latency";
describe_histogram!(NAME, "latency is us when loading a namespace");
Expand Down
5 changes: 4 additions & 1 deletion libsql-server/src/namespace/configurator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ pub(super) async fn make_primary_connection_maker(
.await?
.throttled(
base_config.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
base_config
.connection_creation_timeout
.or(Some(DB_CREATE_TIMEOUT)),
base_config.max_total_response_size,
base_config.max_concurrent_requests,
base_config.disable_intelligent_throttling,
),
);

Expand Down
5 changes: 4 additions & 1 deletion libsql-server/src/namespace/configurator/libsql_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ pub(super) async fn libsql_primary_common(
}
.throttled(
base_config.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
base_config
.connection_creation_timeout
.or(Some(DB_CREATE_TIMEOUT)),
base_config.max_total_response_size,
base_config.max_concurrent_requests,
base_config.disable_intelligent_throttling,
);
let connection_maker = Arc::new(connection_maker);

Expand Down
5 changes: 4 additions & 1 deletion libsql-server/src/namespace/configurator/libsql_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
)
.throttled(
self.base.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
self.base
.connection_creation_timeout
.or(Some(DB_CREATE_TIMEOUT)),
self.base.max_total_response_size,
self.base.max_concurrent_requests,
self.base.disable_intelligent_throttling,
),
);

Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/namespace/configurator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct BaseNamespaceConfig {
pub(crate) max_concurrent_connections: Arc<Semaphore>,
pub(crate) max_concurrent_requests: u64,
pub(crate) encryption_config: Option<EncryptionConfig>,
pub(crate) disable_intelligent_throttling: bool,
pub(crate) connection_creation_timeout: Option<Duration>,
}

#[derive(Clone)]
Expand Down
5 changes: 4 additions & 1 deletion libsql-server/src/namespace/configurator/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,12 @@ impl ConfigureNamespace for ReplicaConfigurator {
)
.throttled(
self.base.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
self.base
.connection_creation_timeout
.or(Some(DB_CREATE_TIMEOUT)),
self.base.max_total_response_size,
self.base.max_concurrent_requests,
self.base.disable_intelligent_throttling,
),
);

Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/schema/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ mod test {
max_concurrent_connections: Arc::new(Semaphore::new(10)),
max_concurrent_requests: 10000,
encryption_config: None,
connection_creation_timeout: None,
disable_intelligent_throttling: false,
};

let primary_config = PrimaryConfig {
Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/test/bottomless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ async fn configure_server(
snapshot_at_shutdown: false,
encryption_config: None,
max_concurrent_requests: 128,
connection_creation_timeout: None,
disable_intelligent_throttling: false,
},
admin_api_config: None,
disable_namespaces: true,
Expand Down

0 comments on commit 701a8cf

Please sign in to comment.