Skip to content

Commit

Permalink
feat: DEPRECATE caching (#1035)
Browse files Browse the repository at this point in the history
Caching will be replaced by hot tier feature
  • Loading branch information
de-sh authored Dec 17, 2024
1 parent eea4841 commit 5ce9d2b
Show file tree
Hide file tree
Showing 24 changed files with 41 additions and 1,453 deletions.
16 changes: 0 additions & 16 deletions src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

use crossterm::style::Stylize;
use human_size::SpecificSize;

use crate::about;
use crate::utils::uid::Uid;
Expand Down Expand Up @@ -93,7 +92,6 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
/// Prints information about the `ObjectStorage`.
/// - Mode (`Local drive`, `S3 bucket`)
/// - Staging (temporary landing point for incoming events)
/// - Cache (local cache of data)
/// - Store (path where the data is stored and its latency)
async fn storage_info(config: &Config) {
let storage = config.storage();
Expand All @@ -109,20 +107,6 @@ async fn storage_info(config: &Config) {
config.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.parseable.local_cache_path {
let size: SpecificSize<human_size::Gigibyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
.unwrap()
.into();

eprintln!(
"\
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
);
}
if let Some(path) = &config.parseable.hot_tier_storage_path {
eprintln!(
"\
Expand Down
70 changes: 3 additions & 67 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ pub struct Cli {
pub domain_address: Option<Url>,

/// The local staging path is used as a temporary landing point
/// for incoming events and local cache
/// for incoming events
pub local_staging_path: PathBuf,

/// The local cache path is used for speeding up query on latest data
pub local_cache_path: Option<PathBuf>,

/// Size for local cache
pub local_cache_size: u64,

/// Username for the basic authentication on the server
pub username: String,

Expand Down Expand Up @@ -96,12 +90,6 @@ pub struct Cli {
/// port use by airplane(flight query service)
pub flight_port: u16,

/// to query cached data
pub query_cache_path: Option<PathBuf>,

/// Size for local cache
pub query_cache_size: u64,

/// CORS behaviour
pub cors: bool,

Expand Down Expand Up @@ -129,10 +117,6 @@ impl Cli {
pub const ADDRESS: &'static str = "address";
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const QUERY_CACHE: &'static str = "query-cache-path";
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
pub const CHECK_UPDATE: &'static str = "check-update";
Expand Down Expand Up @@ -255,45 +239,7 @@ impl Cli {
.help("Local path on this device to be used as landing point for incoming events")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE)
.long(Self::CACHE)
.env("P_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE_SIZE)
.long(Self::CACHE_SIZE)
.env("P_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE)
.long(Self::QUERY_CACHE)
.env("P_QUERY_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE_SIZE)
.long(Self::QUERY_CACHE_SIZE)
.env("P_QUERY_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
.env("P_USERNAME")
Expand Down Expand Up @@ -423,7 +369,7 @@ impl Cli {
.arg(
// RowGroupSize controls the number of rows present in one row group
// More rows = better compression but HIGHER Memory consumption during read/write
// 1048576 is the default value for DataFusion
// 1048576 is the default value for DataFusion
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
Expand Down Expand Up @@ -520,8 +466,6 @@ impl FromArgMatches for Cli {
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();
Expand All @@ -541,14 +485,6 @@ impl FromArgMatches for Cli {
.get_one::<PathBuf>(Self::STAGING)
.cloned()
.expect("default value for staging");
self.local_cache_size = m
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.query_cache_size = m
.get_one(Self::QUERY_CACHE_SIZE)
.cloned()
.expect("default value for query cache size");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down
65 changes: 4 additions & 61 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;

use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;

use crate::handlers::livetail::cross_origin_config;

use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand All @@ -64,8 +59,6 @@ use crate::metadata::STREAM_INFO;
use crate::rbac;
use crate::rbac::Users;

use super::http::query::get_results_from_cache;

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}

Expand Down Expand Up @@ -156,46 +149,11 @@ impl FlightService for AirServiceImpl {

let streams = visitor.into_inner();

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

let cache_results = req
.metadata()
.get(CACHE_RESULTS_HEADER_KEY)
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.

let show_cached = req
.metadata()
.get(CACHE_VIEW_HEADER_KEY)
.and_then(|value| value.to_str().ok());

let user_id = req
.metadata()
.get(USER_ID_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

// send the cached results
if let Ok(cache_results) = get_results_from_cache(
show_cached,
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&ticket.query,
ticket.send_null,
ticket.fields,
)
.await
{
return cache_results.into_flight();
}

update_schema_when_distributed(streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;
Expand Down Expand Up @@ -258,21 +216,6 @@ impl FlightService for AirServiceImpl {
.await
.map_err(|err| Status::internal(err.to_string()))?;

if let Err(err) = put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
ticket.query,
)
.await
{
error!("{}", err);
};

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
Expand Down
95 changes: 0 additions & 95 deletions src/handlers/http/cache.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::event::{
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -236,8 +235,6 @@ pub enum PostError {
#[error("Error: {0}")]
DashboardError(#[from] DashboardError),
#[error("Error: {0}")]
CacheError(#[from] CacheError),
#[error("Error: {0}")]
StreamError(#[from] StreamError),
}

Expand All @@ -259,7 +256,6 @@ impl actix_web::ResponseError for PostError {
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
Expand Down
Loading

0 comments on commit 5ce9d2b

Please sign in to comment.