From 140853dc89bbf2a90d8bfc62fe4feb54a7bcc7b8 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Fri, 20 Dec 2024 11:57:34 +0530 Subject: [PATCH 01/15] add background thread for api calls --- crates/router/src/utils/currency.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 9ab2780da732..f0e65adc3e0f 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -11,6 +11,7 @@ use router_env::{instrument, tracing}; use rust_decimal::Decimal; use strum::IntoEnumIterator; use tokio::{sync::RwLock, time::sleep}; +use tracing_futures::Instrument; use crate::{ logger, @@ -223,6 +224,22 @@ async fn handler_local_no_data( async fn successive_fetch_and_save_forex( state: &SessionState, stale_redis_data: Option, +) -> CustomResult { + // spawn a new thread and do the api fetch and write operations on redis. + let stale_forex_data = stale_redis_data.clone(); + let state = state.clone(); + tokio::spawn( + async move { + acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; + } + .in_current_span(), + ); + stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) +} + +async fn acquire_redis_lock_and_fetch_data( + state: &SessionState, + stale_redis_data: Option, ) -> CustomResult { match acquire_redis_lock(state).await { Ok(lock_acquired) => { @@ -481,11 +498,8 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult Date: Mon, 23 Dec 2024 11:35:40 +0530 Subject: [PATCH 02/15] minor refactors --- crates/router/src/core/currency.rs | 26 +++------- crates/router/src/utils/currency.rs | 75 +++++++++-------------------- 2 files changed, 32 insertions(+), 69 deletions(-) diff --git a/crates/router/src/core/currency.rs b/crates/router/src/core/currency.rs index 912484b014a7..8c1a85128929 100644 --- a/crates/router/src/core/currency.rs +++ b/crates/router/src/core/currency.rs @@ -15,16 +15,11 @@ pub async fn retrieve_forex( ) -> CustomResult, ApiErrorResponse> { let forex_api = state.conf.forex_api.get_inner(); Ok(ApplicationResponse::Json( - get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ApiErrorResponse::GenericNotFoundError { - message: "Unable to fetch forex rates".to_string(), - })?, + get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ApiErrorResponse::GenericNotFoundError { + message: "Unable to fetch forex rates".to_string(), + })?, )) } @@ -53,14 +48,9 @@ pub async fn get_forex_exchange_rates( state: SessionState, ) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(AnalyticsError::ForexFetchFailed)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(AnalyticsError::ForexFetchFailed)?; Ok((*rates.data).clone()) } diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index f0e65adc3e0f..18f731273e3b 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc}; use api_models::enums; use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt}; @@ -10,7 +10,7 @@ use redis_interface::DelReply; use router_env::{instrument, tracing}; use rust_decimal::Decimal; use strum::IntoEnumIterator; -use tokio::{sync::RwLock, time::sleep}; +use tokio::sync::RwLock; use tracing_futures::Instrument; use crate::{ @@ -55,6 +55,8 @@ pub enum ForexCacheError { DefaultCurrencyParsingError, #[error("Entry not found in cache")] EntryNotFound, + #[error("Forex data unavailable")] + ForexDataUnavailable, #[error("Expiration time invalid")] InvalidLogExpiry, #[error("Error reading local")] @@ -120,32 +122,6 @@ async fn save_forex_to_local( Ok(()) } -// Alternative handler for handling the case, When no data in local as well as redis -#[allow(dead_code)] -async fn waited_fetch_and_update_caches( - state: &SessionState, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, -) -> CustomResult { - for _n in 1..local_fetch_retry_count { - sleep(Duration::from_millis(local_fetch_retry_delay)).await; - //read from redis and update local plus break the loop and return - match retrieve_forex_from_redis(state).await { - Ok(Some(rates)) => { - save_forex_to_local(rates.clone()).await?; - return Ok(rates.clone()); - } - Ok(None) => continue, - Err(error) => { - logger::error!(?error); - continue; - } - } - } - //acquire lock one last time and try to fetch and update local & redis - successive_fetch_and_save_forex(state, None).await -} - impl TryFrom for ExchangeRates { type Error = error_stack::Report; fn try_from(value: DefaultExchangeRates) -> Result { @@ -179,8 +155,6 @@ impl From for CurrencyFactors { pub async fn get_forex_rates( state: &SessionState, call_delay: i64, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, ) -> CustomResult { if let Some(local_rates) = retrieve_forex_from_local().await { if local_rates.is_expired(call_delay) { @@ -192,31 +166,26 @@ pub async fn get_forex_rates( } } else { // No data in local - handler_local_no_data( - state, - call_delay, - local_fetch_retry_delay, - local_fetch_retry_count, - ) - .await + handler_local_no_data(state, call_delay).await } } async fn handler_local_no_data( state: &SessionState, call_delay: i64, - _local_fetch_retry_delay: u64, - _local_fetch_retry_count: u64, ) -> CustomResult { match retrieve_forex_from_redis(state).await { Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, Ok(None) => { // No data in local as well as redis - Ok(successive_fetch_and_save_forex(state, None).await?) + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } Err(error) => { + // Error in deriving forex rates from redis logger::error!(?error); - Ok(successive_fetch_and_save_forex(state, None).await?) + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } } } @@ -228,12 +197,14 @@ async fn successive_fetch_and_save_forex( // spawn a new thread and do the api fetch and write operations on redis. let stale_forex_data = stale_redis_data.clone(); let state = state.clone(); + println!(">>>>>>>>>>>>>>>>>>>>>>>code -1"); tokio::spawn( async move { acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; } .in_current_span(), ); + println!(">>>>>>>>>>>>>>>>>>>>>>>code 8"); stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) } @@ -246,9 +217,13 @@ async fn acquire_redis_lock_and_fetch_data( if !lock_acquired { return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); } + println!(">>>>>>>>>>>>>>>>>>>>>>>code 0"); let api_rates = fetch_forex_rates(state).await; match api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Ok(rates) => { + println!(">>>>>>>>>>>>>>>>>>>>>>>code 1"); + successive_save_data_to_redis_local(state, rates).await + } Err(error) => { // API not able to fetch data call secondary service logger::error!(?error); @@ -265,6 +240,7 @@ async fn acquire_redis_lock_and_fetch_data( } } Err(error) => stale_redis_data.ok_or({ + println!(">>>>>>>>>>>>>>>>>>>>>>>code 2"); logger::error!(?error); ForexCacheError::ApiUnresponsive.into() }), @@ -304,6 +280,7 @@ async fn fallback_forex_redis_check( } None => { // redis expired + println!(">>>>>>>>>>>>>>>>redis expired 2"); successive_fetch_and_save_forex(state, Some(redis_data)).await } } @@ -326,6 +303,7 @@ async fn handler_local_expired( } None => { // Redis is expired going for API request + println!(">>>>>>>>>>>>>>>>redis expired 1"); successive_fetch_and_save_forex(state, Some(local_rates)).await } } @@ -498,7 +476,7 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ForexCacheError::ApiError)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ForexCacheError::ApiError)?; let to_currency = enums::Currency::from_str(to_currency.as_str()) .change_context(ForexCacheError::CurrencyNotAcceptable) From eb8732365070e24e130edc7ef0d7da525caa713b Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Fri, 3 Jan 2025 09:35:32 +0530 Subject: [PATCH 03/15] add error handlers for missing api keys --- crates/router/src/utils/currency.rs | 79 +++++++++++++++-------------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 18f731273e3b..caf4bc15383f 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -51,6 +51,8 @@ pub enum ForexCacheError { CouldNotAcquireLock, #[error("Provided currency not acceptable")] CurrencyNotAcceptable, + #[error("Configuration error, api_keys not provided")] + ConfigurationError, #[error("Incorrect entries in default Currency response")] DefaultCurrencyParsingError, #[error("Entry not found in cache")] @@ -174,18 +176,23 @@ async fn handler_local_no_data( state: &SessionState, call_delay: i64, ) -> CustomResult { - match retrieve_forex_from_redis(state).await { - Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, - Ok(None) => { - // No data in local as well as redis - successive_fetch_and_save_forex(state, None).await?; - Err(ForexCacheError::ForexDataUnavailable.into()) - } - Err(error) => { - // Error in deriving forex rates from redis - logger::error!(?error); - successive_fetch_and_save_forex(state, None).await?; - Err(ForexCacheError::ForexDataUnavailable.into()) + let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + if forex_api_key.eq("") { + Err(ForexCacheError::ConfigurationError.into()) + } else { + match retrieve_forex_from_redis(state).await { + Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, + Ok(None) => { + // No data in local as well as redis + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) + } + Err(error) => { + // Error in deriving forex rates from redis + logger::error!(?error); + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) + } } } } @@ -197,14 +204,12 @@ async fn successive_fetch_and_save_forex( // spawn a new thread and do the api fetch and write operations on redis. let stale_forex_data = stale_redis_data.clone(); let state = state.clone(); - println!(">>>>>>>>>>>>>>>>>>>>>>>code -1"); tokio::spawn( async move { acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; } .in_current_span(), ); - println!(">>>>>>>>>>>>>>>>>>>>>>>code 8"); stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) } @@ -217,11 +222,9 @@ async fn acquire_redis_lock_and_fetch_data( if !lock_acquired { return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); } - println!(">>>>>>>>>>>>>>>>>>>>>>>code 0"); let api_rates = fetch_forex_rates(state).await; match api_rates { Ok(rates) => { - println!(">>>>>>>>>>>>>>>>>>>>>>>code 1"); successive_save_data_to_redis_local(state, rates).await } Err(error) => { @@ -240,7 +243,6 @@ async fn acquire_redis_lock_and_fetch_data( } } Err(error) => stale_redis_data.ok_or({ - println!(">>>>>>>>>>>>>>>>>>>>>>>code 2"); logger::error!(?error); ForexCacheError::ApiUnresponsive.into() }), @@ -280,7 +282,6 @@ async fn fallback_forex_redis_check( } None => { // redis expired - println!(">>>>>>>>>>>>>>>>redis expired 2"); successive_fetch_and_save_forex(state, Some(redis_data)).await } } @@ -291,27 +292,31 @@ async fn handler_local_expired( call_delay: i64, local_rates: FxExchangeRatesCacheEntry, ) -> CustomResult { - match retrieve_forex_from_redis(state).await { - Ok(redis_data) => { - match is_redis_expired(redis_data.as_ref(), call_delay).await { - Some(redis_forex) => { - // Valid data present in redis - let exchange_rates = - FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); - save_forex_to_local(exchange_rates.clone()).await?; - Ok(exchange_rates) - } - None => { - // Redis is expired going for API request - println!(">>>>>>>>>>>>>>>>redis expired 1"); - successive_fetch_and_save_forex(state, Some(local_rates)).await + let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + if forex_api_key.eq("") { + Err(ForexCacheError::ConfigurationError.into()) + } else { + match retrieve_forex_from_redis(state).await { + Ok(redis_data) => { + match is_redis_expired(redis_data.as_ref(), call_delay).await { + Some(redis_forex) => { + // Valid data present in redis + let exchange_rates = + FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); + save_forex_to_local(exchange_rates.clone()).await?; + Ok(exchange_rates) + } + None => { + // Redis is expired going for API request + successive_fetch_and_save_forex(state, Some(local_rates)).await + } } } - } - Err(error) => { - // data not present in redis waited fetch - logger::error!(?error); - successive_fetch_and_save_forex(state, Some(local_rates)).await + Err(error) => { + // data not present in redis waited fetch + logger::error!(?error); + successive_fetch_and_save_forex(state, Some(local_rates)).await + } } } } From 5612162cdbb0489877363935795c14134acfb805 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Fri, 3 Jan 2025 04:07:27 +0000 Subject: [PATCH 04/15] chore: run formatter --- crates/router/src/utils/currency.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index caf4bc15383f..b30867cdbce5 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -224,9 +224,7 @@ async fn acquire_redis_lock_and_fetch_data( } let api_rates = fetch_forex_rates(state).await; match api_rates { - Ok(rates) => { - successive_save_data_to_redis_local(state, rates).await - } + Ok(rates) => successive_save_data_to_redis_local(state, rates).await, Err(error) => { // API not able to fetch data call secondary service logger::error!(?error); From 9e5edffc349c5b9d50c079e190ba1951c26e1fc4 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Fri, 3 Jan 2025 16:16:31 +0530 Subject: [PATCH 05/15] add error logs --- crates/router/src/utils/currency.rs | 123 +++++++++++----------------- 1 file changed, 49 insertions(+), 74 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index b30867cdbce5..3c245dcebec2 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -161,7 +161,7 @@ pub async fn get_forex_rates( if let Some(local_rates) = retrieve_forex_from_local().await { if local_rates.is_expired(call_delay) { // expired local data - handler_local_expired(state, call_delay, local_rates).await + successive_fetch_and_save_forex(state, Some(local_rates)).await } else { // Valid data present in local Ok(local_rates) @@ -176,23 +176,18 @@ async fn handler_local_no_data( state: &SessionState, call_delay: i64, ) -> CustomResult { - let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); - if forex_api_key.eq("") { - Err(ForexCacheError::ConfigurationError.into()) - } else { - match retrieve_forex_from_redis(state).await { - Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, - Ok(None) => { - // No data in local as well as redis - successive_fetch_and_save_forex(state, None).await?; - Err(ForexCacheError::ForexDataUnavailable.into()) - } - Err(error) => { - // Error in deriving forex rates from redis - logger::error!(?error); - successive_fetch_and_save_forex(state, None).await?; - Err(ForexCacheError::ForexDataUnavailable.into()) - } + match retrieve_forex_from_redis(state).await { + Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, + Ok(None) => { + // No data in local as well as redis + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) + } + Err(error) => { + // Error in deriving forex rates from redis + logger::error!("forex_log: {:?}", error); + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } } } @@ -202,15 +197,21 @@ async fn successive_fetch_and_save_forex( stale_redis_data: Option, ) -> CustomResult { // spawn a new thread and do the api fetch and write operations on redis. - let stale_forex_data = stale_redis_data.clone(); - let state = state.clone(); - tokio::spawn( - async move { - acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; - } - .in_current_span(), - ); - stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) + let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + if forex_api_key.eq("") { + logger::debug!("forex_log: api_keys not present in configs"); + Err(ForexCacheError::ConfigurationError.into()) + } else { + let stale_forex_data = stale_redis_data.clone(); + let state = state.clone(); + tokio::spawn( + async move { + let _ = acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; + } + .in_current_span(), + ); + stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) + } } async fn acquire_redis_lock_and_fetch_data( @@ -220,19 +221,21 @@ async fn acquire_redis_lock_and_fetch_data( match acquire_redis_lock(state).await { Ok(lock_acquired) => { if !lock_acquired { + logger::debug!("forex_log: Unable to acquire redis lock"); return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); } + logger::debug!("forex_log: redis lock acquired"); let api_rates = fetch_forex_rates(state).await; match api_rates { Ok(rates) => successive_save_data_to_redis_local(state, rates).await, Err(error) => { // API not able to fetch data call secondary service - logger::error!(?error); + logger::error!("forex_log: {:?}", error); let secondary_api_rates = fallback_fetch_forex_rates(state).await; match secondary_api_rates { Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?), Err(error) => stale_redis_data.ok_or({ - logger::error!(?error); + logger::error!("forex_log: {:?}", error); release_redis_lock(state).await?; ForexCacheError::ApiUnresponsive.into() }), @@ -241,7 +244,7 @@ async fn acquire_redis_lock_and_fetch_data( } } Err(error) => stale_redis_data.ok_or({ - logger::error!(?error); + logger::error!("forex_log: {:?}", error); ForexCacheError::ApiUnresponsive.into() }), } @@ -259,7 +262,7 @@ async fn successive_save_data_to_redis_local( .await .map_or_else( |error| { - logger::error!(?error); + logger::error!("forex_log: {:?}", error); forex.clone() }, |_| forex.clone(), @@ -285,52 +288,19 @@ async fn fallback_forex_redis_check( } } -async fn handler_local_expired( - state: &SessionState, - call_delay: i64, - local_rates: FxExchangeRatesCacheEntry, -) -> CustomResult { - let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); - if forex_api_key.eq("") { - Err(ForexCacheError::ConfigurationError.into()) - } else { - match retrieve_forex_from_redis(state).await { - Ok(redis_data) => { - match is_redis_expired(redis_data.as_ref(), call_delay).await { - Some(redis_forex) => { - // Valid data present in redis - let exchange_rates = - FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); - save_forex_to_local(exchange_rates.clone()).await?; - Ok(exchange_rates) - } - None => { - // Redis is expired going for API request - successive_fetch_and_save_forex(state, Some(local_rates)).await - } - } - } - Err(error) => { - // data not present in redis waited fetch - logger::error!(?error); - successive_fetch_and_save_forex(state, Some(local_rates)).await - } - } - } -} - async fn fetch_forex_rates( state: &SessionState, ) -> Result> { let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + logger::debug!("forex_log: Primary api call for forex fetch"); let forex_url: String = format!("{}{}{}", FOREX_BASE_URL, forex_api_key, FOREX_BASE_CURRENCY); let forex_request = services::RequestBuilder::new() .method(services::Method::Get) .url(&forex_url) .build(); - logger::info!(?forex_request); + logger::info!("forex_log: {:?}", forex_request); let response = state .api_client .send_request( @@ -350,7 +320,7 @@ async fn fetch_forex_rates( "Unable to parse response received from primary api into ForexResponse", )?; - logger::info!("{:?}", forex_response); + logger::info!("forex_log: {:?}", forex_response); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { @@ -359,7 +329,7 @@ async fn fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!("forex_log: Rates for {} not received from API", &enum_curr); continue; } }; @@ -367,7 +337,7 @@ async fn fetch_forex_rates( conversions.insert(enum_curr, currency_factors); } None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!("forex_log: Rates for {} not received from API", &enum_curr); } }; } @@ -390,7 +360,8 @@ pub async fn fallback_fetch_forex_rates( .url(&fallback_forex_url) .build(); - logger::info!(?fallback_forex_request); + logger::debug!("forex_log: Fallback api call for forex fetch"); + logger::info!("forex_log: {:?}", fallback_forex_request); let response = state .api_client .send_request( @@ -411,7 +382,7 @@ pub async fn fallback_fetch_forex_rates( "Unable to parse response received from falback api into ForexResponse", )?; - logger::info!("{:?}", fallback_forex_response); + logger::info!("forex_log: {:?}", fallback_forex_response); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { match fallback_forex_response.quotes.get( @@ -426,7 +397,7 @@ pub async fn fallback_fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!("forex_log: Rates for {} not received from API", &enum_curr); continue; } }; @@ -439,7 +410,7 @@ pub async fn fallback_fetch_forex_rates( CurrencyFactors::new(Decimal::new(1, 0), Decimal::new(1, 0)); conversions.insert(enum_curr, currency_factors); } else { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!("forex_log: Rates for {} not received from API", &enum_curr); } } }; @@ -450,7 +421,7 @@ pub async fn fallback_fetch_forex_rates( match acquire_redis_lock(state).await { Ok(_) => Ok(successive_save_data_to_redis_local(state, rates).await?), Err(e) => { - logger::error!(?e); + logger::error!("forex_log: {:?}", e); Ok(rates) } } @@ -459,6 +430,7 @@ pub async fn fallback_fetch_forex_rates( async fn release_redis_lock( state: &SessionState, ) -> Result> { + logger::debug!("forex_log: Releasing redis lock"); state .store .get_redis_conn() @@ -471,6 +443,7 @@ async fn release_redis_lock( async fn acquire_redis_lock(state: &SessionState) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); + logger::debug!("forex_log: Acquiring redis lock"); state .store .get_redis_conn() @@ -493,6 +466,7 @@ async fn save_forex_to_redis( app_state: &SessionState, forex_exchange_cache_entry: &FxExchangeRatesCacheEntry, ) -> CustomResult<(), ForexCacheError> { + logger::debug!("forex_log: Saving forex to redis"); app_state .store .get_redis_conn() @@ -506,6 +480,7 @@ async fn save_forex_to_redis( async fn retrieve_forex_from_redis( app_state: &SessionState, ) -> CustomResult, ForexCacheError> { + logger::debug!("forex_log: Retrieving forex from redis"); app_state .store .get_redis_conn() From 29c4f5e49e7653e47c345e8659663522552b9233 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Fri, 3 Jan 2025 17:27:45 +0530 Subject: [PATCH 06/15] refactor envs with correct values for expiration of forex data --- config/config.example.toml | 6 ++---- config/deployments/env_specific.toml | 4 +--- config/development.toml | 4 +--- config/docker_compose.toml | 4 +--- crates/router/src/configs/settings.rs | 7 ++----- crates/router/src/utils/currency.rs | 7 ++++++- loadtest/config/development.toml | 4 +--- 7 files changed, 14 insertions(+), 22 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index 999266321144..e55cf56689f7 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -75,13 +75,11 @@ max_feed_count = 200 # The maximum number of frames that will be fe # This section provides configs for currency conversion api [forex_api] -call_delay = 21600 # Api calls are made after every 6 hrs -local_fetch_retry_count = 5 # Fetch from Local cache has retry count as 5 -local_fetch_retry_delay = 1000 # Retry delay for checking write condition +call_delay = 21600 # Expiration time for data in cache as well as redis in seconds api_timeout = 20000 # Api timeouts once it crosses 20000 ms api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api fallback_api_key = "YOUR API KEY" # Api key for the fallback service -redis_lock_timeout = 26000 # Redis remains write locked for 26000 ms once the acquire_redis_lock is called +redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called # Logging configuration. Logging can be either to file or console or both. diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 967b847dae51..6f21b46024bc 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -102,12 +102,10 @@ bucket_name = "bucket" # The AWS S3 bucket name for file storage # This section provides configs for currency conversion api [forex_api] call_delay = 21600 # Api calls are made after every 6 hrs -local_fetch_retry_count = 5 # Fetch from Local cache has retry count as 5 -local_fetch_retry_delay = 1000 # Retry delay for checking write condition api_timeout = 20000 # Api timeouts once it crosses 20000 ms api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api fallback_api_key = "YOUR API KEY" # Api key for the fallback service -redis_lock_timeout = 26000 # Redis remains write locked for 26000 ms once the acquire_redis_lock is called +redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called [jwekey] # 3 priv/pub key pair vault_encryption_key = "" # public key in pem format, corresponding private key in rust locker diff --git a/config/development.toml b/config/development.toml index 4c9b8516b5ad..ea134989ab37 100644 --- a/config/development.toml +++ b/config/development.toml @@ -78,12 +78,10 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +redis_lock_timeout = 100 [jwekey] vault_encryption_key = "" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 75699d0a9674..148da60f9998 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -31,12 +31,10 @@ pool_size = 5 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +redis_lock_timeout = 100 [replica_database] username = "db_user" diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index ad5d9e89aaae..5f60d478cb53 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -299,16 +299,13 @@ pub struct PaymentLink { #[derive(Debug, Deserialize, Clone, Default)] #[serde(default)] pub struct ForexApi { - pub local_fetch_retry_count: u64, pub api_key: Secret, pub fallback_api_key: Secret, - /// in ms + /// in s pub call_delay: i64, /// in ms - pub local_fetch_retry_delay: u64, - /// in ms pub api_timeout: u64, - /// in ms + /// in s pub redis_lock_timeout: u64, } diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 3c245dcebec2..acc130c61435 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -121,6 +121,7 @@ async fn save_forex_to_local( ) -> CustomResult<(), ForexCacheError> { let mut local = FX_EXCHANGE_RATES_CACHE.write().await; *local = Some(exchange_rates_cache_entry); + logger::debug!("forex_log: forex saved in cache"); Ok(()) } @@ -161,9 +162,11 @@ pub async fn get_forex_rates( if let Some(local_rates) = retrieve_forex_from_local().await { if local_rates.is_expired(call_delay) { // expired local data + logger::debug!("forex_log: Forex stored in cache is expired"); successive_fetch_and_save_forex(state, Some(local_rates)).await } else { // Valid data present in local + logger::debug!("forex_log: forex response from cache"); Ok(local_rates) } } else { @@ -198,7 +201,7 @@ async fn successive_fetch_and_save_forex( ) -> CustomResult { // spawn a new thread and do the api fetch and write operations on redis. let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); - if forex_api_key.eq("") { + if forex_api_key.is_empty() { logger::debug!("forex_log: api_keys not present in configs"); Err(ForexCacheError::ConfigurationError.into()) } else { @@ -278,6 +281,7 @@ async fn fallback_forex_redis_check( Some(redis_forex) => { // Valid data present in redis let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); + logger::debug!("forex_log: forex response from redis"); save_forex_to_local(exchange_rates.clone()).await?; Ok(exchange_rates) } @@ -499,6 +503,7 @@ async fn is_redis_expired( if cache.timestamp + call_delay > date_time::now_unix_timestamp() { Some(cache.data.clone()) } else { + logger::debug!("forex_log: Forex stored in redis is expired"); None } }) diff --git a/loadtest/config/development.toml b/loadtest/config/development.toml index ec58ab08b878..df6a2a5ce234 100644 --- a/loadtest/config/development.toml +++ b/loadtest/config/development.toml @@ -48,12 +48,10 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +redis_lock_timeout = 100 [eph_key] validity = 1 From c4e60d3698dccf14a9e937ec17b348657b377a02 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Fri, 3 Jan 2025 17:31:35 +0530 Subject: [PATCH 07/15] refactor env's comment --- config/deployments/env_specific.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 6f21b46024bc..feca52ffa79d 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -101,7 +101,7 @@ bucket_name = "bucket" # The AWS S3 bucket name for file storage # This section provides configs for currency conversion api [forex_api] -call_delay = 21600 # Api calls are made after every 6 hrs +call_delay = 21600 # Expiration time for data in cache as well as redis in seconds api_timeout = 20000 # Api timeouts once it crosses 20000 ms api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api fallback_api_key = "YOUR API KEY" # Api key for the fallback service From b64943367c6394505e0afeef416c6cab49e0f971 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 15:47:38 +0530 Subject: [PATCH 08/15] address comments --- config/config.example.toml | 1 - config/deployments/env_specific.toml | 1 - config/development.toml | 1 - config/docker_compose.toml | 1 - crates/router/src/configs/settings.rs | 2 - crates/router/src/utils/currency.rs | 57 ++++++++++++++++----------- loadtest/config/development.toml | 1 - 7 files changed, 33 insertions(+), 31 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index e55cf56689f7..f47a3afb8f7f 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -76,7 +76,6 @@ max_feed_count = 200 # The maximum number of frames that will be fe # This section provides configs for currency conversion api [forex_api] call_delay = 21600 # Expiration time for data in cache as well as redis in seconds -api_timeout = 20000 # Api timeouts once it crosses 20000 ms api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api fallback_api_key = "YOUR API KEY" # Api key for the fallback service redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index feca52ffa79d..0a8ae377fcab 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -102,7 +102,6 @@ bucket_name = "bucket" # The AWS S3 bucket name for file storage # This section provides configs for currency conversion api [forex_api] call_delay = 21600 # Expiration time for data in cache as well as redis in seconds -api_timeout = 20000 # Api timeouts once it crosses 20000 ms api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api fallback_api_key = "YOUR API KEY" # Api key for the fallback service redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called diff --git a/config/development.toml b/config/development.toml index ea134989ab37..46c45c768926 100644 --- a/config/development.toml +++ b/config/development.toml @@ -78,7 +78,6 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" redis_lock_timeout = 100 diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 148da60f9998..b00c762da041 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -31,7 +31,6 @@ pool_size = 5 [forex_api] call_delay = 21600 -api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" redis_lock_timeout = 100 diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 5f60d478cb53..5f5b77fc72c6 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -303,8 +303,6 @@ pub struct ForexApi { pub fallback_api_key: Secret, /// in s pub call_delay: i64, - /// in ms - pub api_timeout: u64, /// in s pub redis_lock_timeout: u64, } diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index acc130c61435..2c4e6a0cff7f 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -51,8 +51,8 @@ pub enum ForexCacheError { CouldNotAcquireLock, #[error("Provided currency not acceptable")] CurrencyNotAcceptable, - #[error("Configuration error, api_keys not provided")] - ConfigurationError, + #[error("Forex configuration error: {0}")] + ConfigurationError(String), #[error("Incorrect entries in default Currency response")] DefaultCurrencyParsingError, #[error("Entry not found in cache")] @@ -166,7 +166,7 @@ pub async fn get_forex_rates( successive_fetch_and_save_forex(state, Some(local_rates)).await } else { // Valid data present in local - logger::debug!("forex_log: forex response from cache"); + logger::debug!("forex_log: forex found in cache"); Ok(local_rates) } } else { @@ -202,54 +202,56 @@ async fn successive_fetch_and_save_forex( // spawn a new thread and do the api fetch and write operations on redis. let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); if forex_api_key.is_empty() { - logger::debug!("forex_log: api_keys not present in configs"); - Err(ForexCacheError::ConfigurationError.into()) + Err(ForexCacheError::ConfigurationError("api_keys not provided".into()).into()) } else { - let stale_forex_data = stale_redis_data.clone(); let state = state.clone(); tokio::spawn( async move { - let _ = acquire_redis_lock_and_fetch_data(&state, stale_redis_data).await; + acquire_redis_lock_and_fetch_data(&state).await.ok(); } .in_current_span(), ); - stale_forex_data.ok_or(ForexCacheError::EntryNotFound.into()) + stale_redis_data.ok_or(ForexCacheError::EntryNotFound.into()) } } async fn acquire_redis_lock_and_fetch_data( state: &SessionState, - stale_redis_data: Option, -) -> CustomResult { +) -> CustomResult<(), ForexCacheError> { match acquire_redis_lock(state).await { Ok(lock_acquired) => { if !lock_acquired { logger::debug!("forex_log: Unable to acquire redis lock"); - return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); } logger::debug!("forex_log: redis lock acquired"); let api_rates = fetch_forex_rates(state).await; match api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Ok(rates) => { + successive_save_data_to_redis_local(state, rates).await?; + Ok(()) + } Err(error) => { // API not able to fetch data call secondary service logger::error!("forex_log: {:?}", error); let secondary_api_rates = fallback_fetch_forex_rates(state).await; match secondary_api_rates { - Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?), - Err(error) => stale_redis_data.ok_or({ + Ok(rates) => { + successive_save_data_to_redis_local(state, rates).await?; + Ok(()) + } + Err(error) => { logger::error!("forex_log: {:?}", error); release_redis_lock(state).await?; - ForexCacheError::ApiUnresponsive.into() - }), + Ok(()) + } } } } } - Err(error) => stale_redis_data.ok_or({ + Err(error) => { logger::error!("forex_log: {:?}", error); - ForexCacheError::ApiUnresponsive.into() - }), + Ok(()) + } } } @@ -281,7 +283,7 @@ async fn fallback_forex_redis_check( Some(redis_forex) => { // Valid data present in redis let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); - logger::debug!("forex_log: forex response from redis"); + logger::debug!("forex_log: forex response found in redis"); save_forex_to_local(exchange_rates.clone()).await?; Ok(exchange_rates) } @@ -304,7 +306,8 @@ async fn fetch_forex_rates( .url(&forex_url) .build(); - logger::info!("forex_log: {:?}", forex_request); + logger::debug!("forex_log: Primary api call for forex fetch"); + logger::info!("forex_log: primary_forex_request: {:?}", forex_request); let response = state .api_client .send_request( @@ -324,7 +327,7 @@ async fn fetch_forex_rates( "Unable to parse response received from primary api into ForexResponse", )?; - logger::info!("forex_log: {:?}", forex_response); + logger::info!("forex_log: primary_forex_response: {:?}", forex_response); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { @@ -365,7 +368,10 @@ pub async fn fallback_fetch_forex_rates( .build(); logger::debug!("forex_log: Fallback api call for forex fetch"); - logger::info!("forex_log: {:?}", fallback_forex_request); + logger::info!( + "forex_log: fallback_forex_request: {:?}", + fallback_forex_request + ); let response = state .api_client .send_request( @@ -386,7 +392,10 @@ pub async fn fallback_fetch_forex_rates( "Unable to parse response received from falback api into ForexResponse", )?; - logger::info!("forex_log: {:?}", fallback_forex_response); + logger::info!( + "forex_log: fallback_forex_response: {:?}", + fallback_forex_response + ); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { match fallback_forex_response.quotes.get( diff --git a/loadtest/config/development.toml b/loadtest/config/development.toml index df6a2a5ce234..5b1fe0c4db59 100644 --- a/loadtest/config/development.toml +++ b/loadtest/config/development.toml @@ -48,7 +48,6 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -api_timeout = 20000 api_key = "YOUR API KEY HERE" fallback_api_key = "YOUR API KEY HERE" redis_lock_timeout = 100 From 7c16081adabc0bb7a2389a0626b4d709170d378a Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 16:01:06 +0530 Subject: [PATCH 09/15] remove api key placeholder so that the config error guard clause can work --- config/config.example.toml | 4 ++-- config/deployments/env_specific.toml | 4 ++-- config/development.toml | 4 ++-- config/docker_compose.toml | 4 ++-- crates/analytics/docs/README.md | 6 +++--- loadtest/config/development.toml | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index f47a3afb8f7f..61fa05111b43 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -76,8 +76,8 @@ max_feed_count = 200 # The maximum number of frames that will be fe # This section provides configs for currency conversion api [forex_api] call_delay = 21600 # Expiration time for data in cache as well as redis in seconds -api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api -fallback_api_key = "YOUR API KEY" # Api key for the fallback service +api_key = "" # Api key for making request to foreign exchange Api +fallback_api_key = "" # Api key for the fallback service redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called # Logging configuration. Logging can be either to file or console or both. diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 0a8ae377fcab..f7d6415e85fb 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -102,8 +102,8 @@ bucket_name = "bucket" # The AWS S3 bucket name for file storage # This section provides configs for currency conversion api [forex_api] call_delay = 21600 # Expiration time for data in cache as well as redis in seconds -api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api -fallback_api_key = "YOUR API KEY" # Api key for the fallback service +api_key = "" # Api key for making request to foreign exchange Api +fallback_api_key = "" # Api key for the fallback service redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called [jwekey] # 3 priv/pub key pair diff --git a/config/development.toml b/config/development.toml index 46c45c768926..b3e3eb3665e1 100644 --- a/config/development.toml +++ b/config/development.toml @@ -78,8 +78,8 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" +api_key = "" +fallback_api_key = "" redis_lock_timeout = 100 [jwekey] diff --git a/config/docker_compose.toml b/config/docker_compose.toml index b00c762da041..5952422f246a 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -31,8 +31,8 @@ pool_size = 5 [forex_api] call_delay = 21600 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" +api_key = "" +fallback_api_key = "" redis_lock_timeout = 100 [replica_database] diff --git a/crates/analytics/docs/README.md b/crates/analytics/docs/README.md index e24dc6c5af79..2eca9c7859cb 100644 --- a/crates/analytics/docs/README.md +++ b/crates/analytics/docs/README.md @@ -115,8 +115,8 @@ To configure the Forex APIs, update the `config/development.toml` or `config/doc ```toml [forex_api] -api_key = "YOUR API KEY HERE" # Replace the placeholder with your Primary API Key -fallback_api_key = "YOUR API KEY HERE" # Replace the placeholder with your Fallback API Key +api_key = "" # Replace the placeholder with your Primary API Key +fallback_api_key = "" # Replace the placeholder with your Fallback API Key ``` ### Important Note ```bash @@ -159,4 +159,4 @@ To view the data on the OpenSearch dashboard perform the following steps: - Select a time field that will be used for time-based queries - Save the index pattern -Now, head on to `Discover` under the `OpenSearch Dashboards` tab, to select the newly created index pattern and query the data \ No newline at end of file +Now, head on to `Discover` under the `OpenSearch Dashboards` tab, to select the newly created index pattern and query the data diff --git a/loadtest/config/development.toml b/loadtest/config/development.toml index 5b1fe0c4db59..cd0000d2a9d4 100644 --- a/loadtest/config/development.toml +++ b/loadtest/config/development.toml @@ -48,8 +48,8 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" +api_key = "" +fallback_api_key = "" redis_lock_timeout = 100 [eph_key] From 4d7959c6d40f633139051b63b92cc947f8be3d40 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 16:14:01 +0530 Subject: [PATCH 10/15] address comments --- crates/router/src/utils/currency.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 2c4e6a0cff7f..d0ec4483a3d6 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -221,7 +221,8 @@ async fn acquire_redis_lock_and_fetch_data( match acquire_redis_lock(state).await { Ok(lock_acquired) => { if !lock_acquired { - logger::debug!("forex_log: Unable to acquire redis lock"); + logger::error!("forex_log: Unable to acquire redis lock"); + return Err(ForexCacheError::CouldNotAcquireLock.into()); } logger::debug!("forex_log: redis lock acquired"); let api_rates = fetch_forex_rates(state).await; @@ -307,7 +308,7 @@ async fn fetch_forex_rates( .build(); logger::debug!("forex_log: Primary api call for forex fetch"); - logger::info!("forex_log: primary_forex_request: {:?}", forex_request); + logger::info!(primary_forex_request=?forex_request,"forex_log"); let response = state .api_client .send_request( @@ -327,7 +328,7 @@ async fn fetch_forex_rates( "Unable to parse response received from primary api into ForexResponse", )?; - logger::info!("forex_log: primary_forex_response: {:?}", forex_response); + logger::info!(primary_forex_response=?forex_response,"forex_log"); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { @@ -368,10 +369,7 @@ pub async fn fallback_fetch_forex_rates( .build(); logger::debug!("forex_log: Fallback api call for forex fetch"); - logger::info!( - "forex_log: fallback_forex_request: {:?}", - fallback_forex_request - ); + logger::info!(fallback_forex_request=?fallback_forex_request,"forex_log"); let response = state .api_client .send_request( @@ -392,10 +390,8 @@ pub async fn fallback_fetch_forex_rates( "Unable to parse response received from falback api into ForexResponse", )?; - logger::info!( - "forex_log: fallback_forex_response: {:?}", - fallback_forex_response - ); + logger::info!(fallback_forex_response=?fallback_forex_response,"forex_log"); + let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { match fallback_forex_response.quotes.get( From aef526da88bdf6c8699d2aebf049fb3716abc489 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 16:21:31 +0530 Subject: [PATCH 11/15] address comments --- crates/router/src/utils/currency.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index d0ec4483a3d6..ad988714b67a 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -307,8 +307,7 @@ async fn fetch_forex_rates( .url(&forex_url) .build(); - logger::debug!("forex_log: Primary api call for forex fetch"); - logger::info!(primary_forex_request=?forex_request,"forex_log"); + logger::info!(primary_forex_request=?forex_request,"forex_log: Primary api call for forex fetch"); let response = state .api_client .send_request( @@ -368,8 +367,7 @@ pub async fn fallback_fetch_forex_rates( .url(&fallback_forex_url) .build(); - logger::debug!("forex_log: Fallback api call for forex fetch"); - logger::info!(fallback_forex_request=?fallback_forex_request,"forex_log"); + logger::info!(fallback_forex_request=?fallback_forex_request,"forex_log: Fallback api call for forex fetch"); let response = state .api_client .send_request( From a1d0992edaeea24068e25ca611b27fa1617bd31b Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 17:22:45 +0530 Subject: [PATCH 12/15] address comments --- crates/router/src/utils/currency.rs | 57 ++++++++++++++++------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index ad988714b67a..b5b60718f01a 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -188,7 +188,7 @@ async fn handler_local_no_data( } Err(error) => { // Error in deriving forex rates from redis - logger::error!("forex_log: {:?}", error); + logger::error!("forex_error: {:?}", error); successive_fetch_and_save_forex(state, None).await?; Err(ForexCacheError::ForexDataUnavailable.into()) } @@ -207,7 +207,12 @@ async fn successive_fetch_and_save_forex( let state = state.clone(); tokio::spawn( async move { - acquire_redis_lock_and_fetch_data(&state).await.ok(); + acquire_redis_lock_and_fetch_data(&state) + .await + .map_err(|err| { + logger::error!(forex_error=?err); + }) + .ok(); } .in_current_span(), ); @@ -221,7 +226,6 @@ async fn acquire_redis_lock_and_fetch_data( match acquire_redis_lock(state).await { Ok(lock_acquired) => { if !lock_acquired { - logger::error!("forex_log: Unable to acquire redis lock"); return Err(ForexCacheError::CouldNotAcquireLock.into()); } logger::debug!("forex_log: redis lock acquired"); @@ -232,8 +236,8 @@ async fn acquire_redis_lock_and_fetch_data( Ok(()) } Err(error) => { + logger::error!(forex_error=?error,"primary_forex_error"); // API not able to fetch data call secondary service - logger::error!("forex_log: {:?}", error); let secondary_api_rates = fallback_fetch_forex_rates(state).await; match secondary_api_rates { Ok(rates) => { @@ -241,38 +245,27 @@ async fn acquire_redis_lock_and_fetch_data( Ok(()) } Err(error) => { - logger::error!("forex_log: {:?}", error); release_redis_lock(state).await?; - Ok(()) + Err(error) } } } } } - Err(error) => { - logger::error!("forex_log: {:?}", error); - Ok(()) - } + Err(error) => Err(error), } } async fn successive_save_data_to_redis_local( state: &SessionState, forex: FxExchangeRatesCacheEntry, -) -> CustomResult { - Ok(save_forex_to_redis(state, &forex) +) -> CustomResult<(), ForexCacheError> { + save_forex_to_redis(state, &forex) .await .async_and_then(|_rates| release_redis_lock(state)) .await .async_and_then(|_val| save_forex_to_local(forex.clone())) .await - .map_or_else( - |error| { - logger::error!("forex_log: {:?}", error); - forex.clone() - }, - |_| forex.clone(), - )) } async fn fallback_forex_redis_check( @@ -336,7 +329,10 @@ async fn fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("forex_log: Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); continue; } }; @@ -344,7 +340,10 @@ async fn fetch_forex_rates( conversions.insert(enum_curr, currency_factors); } None => { - logger::error!("forex_log: Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); } }; } @@ -404,7 +403,10 @@ pub async fn fallback_fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("forex_log: Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); continue; } }; @@ -417,7 +419,10 @@ pub async fn fallback_fetch_forex_rates( CurrencyFactors::new(Decimal::new(1, 0), Decimal::new(1, 0)); conversions.insert(enum_curr, currency_factors); } else { - logger::error!("forex_log: Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); } } }; @@ -426,11 +431,11 @@ pub async fn fallback_fetch_forex_rates( let rates = FxExchangeRatesCacheEntry::new(ExchangeRates::new(enums::Currency::USD, conversions)); match acquire_redis_lock(state).await { - Ok(_) => Ok(successive_save_data_to_redis_local(state, rates).await?), - Err(e) => { - logger::error!("forex_log: {:?}", e); + Ok(_) => { + successive_save_data_to_redis_local(state, rates.clone()).await?; Ok(rates) } + Err(e) => Err(e), } } From e350a2394bcdf35f04af13b671ebdbd99c9f6c0e Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Mon, 6 Jan 2025 20:03:26 +0530 Subject: [PATCH 13/15] address comments --- crates/router/src/utils/currency.rs | 43 ++++++++++++----------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index b5b60718f01a..7bdb8121417d 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -223,36 +223,27 @@ async fn successive_fetch_and_save_forex( async fn acquire_redis_lock_and_fetch_data( state: &SessionState, ) -> CustomResult<(), ForexCacheError> { - match acquire_redis_lock(state).await { - Ok(lock_acquired) => { - if !lock_acquired { - return Err(ForexCacheError::CouldNotAcquireLock.into()); - } - logger::debug!("forex_log: redis lock acquired"); - let api_rates = fetch_forex_rates(state).await; - match api_rates { - Ok(rates) => { - successive_save_data_to_redis_local(state, rates).await?; - Ok(()) - } - Err(error) => { - logger::error!(forex_error=?error,"primary_forex_error"); - // API not able to fetch data call secondary service - let secondary_api_rates = fallback_fetch_forex_rates(state).await; - match secondary_api_rates { - Ok(rates) => { - successive_save_data_to_redis_local(state, rates).await?; - Ok(()) - } - Err(error) => { - release_redis_lock(state).await?; - Err(error) - } + let lock_acquired = acquire_redis_lock(state).await?; + if !lock_acquired { + Err(ForexCacheError::CouldNotAcquireLock.into()) + } else { + logger::debug!("forex_log: redis lock acquired"); + let api_rates = fetch_forex_rates(state).await; + match api_rates { + Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Err(error) => { + logger::error!(forex_error=?error,"primary_forex_error"); + // API not able to fetch data call secondary service + let secondary_api_rates = fallback_fetch_forex_rates(state).await; + match secondary_api_rates { + Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Err(error) => { + release_redis_lock(state).await?; + Err(error) } } } } - Err(error) => Err(error), } } From 723c6f4e2dc63ab05d3c5f4ee984298575106ba8 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Thu, 16 Jan 2025 16:13:02 +0530 Subject: [PATCH 14/15] refactor(currency_conversion): rename functions --- crates/analytics/docs/README.md | 4 +-- crates/router/src/utils/currency.rs | 56 ++++++++++++++--------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/crates/analytics/docs/README.md b/crates/analytics/docs/README.md index 2eca9c7859cb..71190613a107 100644 --- a/crates/analytics/docs/README.md +++ b/crates/analytics/docs/README.md @@ -115,8 +115,8 @@ To configure the Forex APIs, update the `config/development.toml` or `config/doc ```toml [forex_api] -api_key = "" # Replace the placeholder with your Primary API Key -fallback_api_key = "" # Replace the placeholder with your Fallback API Key +api_key = "" +fallback_api_key = "" ``` ### Important Note ```bash diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 7bdb8121417d..b3bec1d6b3f4 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -112,11 +112,11 @@ impl FxExchangeRatesCacheEntry { } } -async fn retrieve_forex_from_local() -> Option { +async fn retrieve_forex_from_local_cache() -> Option { FX_EXCHANGE_RATES_CACHE.read().await.clone() } -async fn save_forex_to_local( +async fn save_forex_data_to_local_cache( exchange_rates_cache_entry: FxExchangeRatesCacheEntry, ) -> CustomResult<(), ForexCacheError> { let mut local = FX_EXCHANGE_RATES_CACHE.write().await; @@ -159,11 +159,11 @@ pub async fn get_forex_rates( state: &SessionState, call_delay: i64, ) -> CustomResult { - if let Some(local_rates) = retrieve_forex_from_local().await { + if let Some(local_rates) = retrieve_forex_from_local_cache().await { if local_rates.is_expired(call_delay) { // expired local data logger::debug!("forex_log: Forex stored in cache is expired"); - successive_fetch_and_save_forex(state, Some(local_rates)).await + call_forex_api_and_save_data_to_cache_and_redis(state, Some(local_rates)).await } else { // Valid data present in local logger::debug!("forex_log: forex found in cache"); @@ -171,31 +171,31 @@ pub async fn get_forex_rates( } } else { // No data in local - handler_local_no_data(state, call_delay).await + check_local_cache_expiry_and_call_api(state, call_delay).await } } -async fn handler_local_no_data( +async fn check_local_cache_expiry_and_call_api( state: &SessionState, call_delay: i64, ) -> CustomResult { - match retrieve_forex_from_redis(state).await { - Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, + match retrieve_forex_data_from_redis(state).await { + Ok(Some(data)) => check_redis_data_expiry_and_call_api(state, data, call_delay).await, Ok(None) => { // No data in local as well as redis - successive_fetch_and_save_forex(state, None).await?; + call_forex_api_and_save_data_to_cache_and_redis(state, None).await?; Err(ForexCacheError::ForexDataUnavailable.into()) } Err(error) => { // Error in deriving forex rates from redis logger::error!("forex_error: {:?}", error); - successive_fetch_and_save_forex(state, None).await?; + call_forex_api_and_save_data_to_cache_and_redis(state, None).await?; Err(ForexCacheError::ForexDataUnavailable.into()) } } } -async fn successive_fetch_and_save_forex( +async fn call_forex_api_and_save_data_to_cache_and_redis( state: &SessionState, stale_redis_data: Option, ) -> CustomResult { @@ -207,7 +207,7 @@ async fn successive_fetch_and_save_forex( let state = state.clone(); tokio::spawn( async move { - acquire_redis_lock_and_fetch_data(&state) + acquire_redis_lock_and_call_forex_api(&state) .await .map_err(|err| { logger::error!(forex_error=?err); @@ -220,7 +220,7 @@ async fn successive_fetch_and_save_forex( } } -async fn acquire_redis_lock_and_fetch_data( +async fn acquire_redis_lock_and_call_forex_api( state: &SessionState, ) -> CustomResult<(), ForexCacheError> { let lock_acquired = acquire_redis_lock(state).await?; @@ -228,15 +228,15 @@ async fn acquire_redis_lock_and_fetch_data( Err(ForexCacheError::CouldNotAcquireLock.into()) } else { logger::debug!("forex_log: redis lock acquired"); - let api_rates = fetch_forex_rates(state).await; + let api_rates = fetch_forex_rates_from_primary_api(state).await; match api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Ok(rates) => save_forex_data_to_cache_and_redis(state, rates).await, Err(error) => { logger::error!(forex_error=?error,"primary_forex_error"); // API not able to fetch data call secondary service - let secondary_api_rates = fallback_fetch_forex_rates(state).await; + let secondary_api_rates = fetch_forex_rates_from_fallback_api(state).await; match secondary_api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Ok(rates) => save_forex_data_to_cache_and_redis(state, rates).await, Err(error) => { release_redis_lock(state).await?; Err(error) @@ -247,19 +247,19 @@ async fn acquire_redis_lock_and_fetch_data( } } -async fn successive_save_data_to_redis_local( +async fn save_forex_data_to_cache_and_redis( state: &SessionState, forex: FxExchangeRatesCacheEntry, ) -> CustomResult<(), ForexCacheError> { - save_forex_to_redis(state, &forex) + save_forex_data_to_redis(state, &forex) .await .async_and_then(|_rates| release_redis_lock(state)) .await - .async_and_then(|_val| save_forex_to_local(forex.clone())) + .async_and_then(|_val| save_forex_data_to_local_cache(forex.clone())) .await } -async fn fallback_forex_redis_check( +async fn check_redis_data_expiry_and_call_api( state: &SessionState, redis_data: FxExchangeRatesCacheEntry, call_delay: i64, @@ -269,17 +269,17 @@ async fn fallback_forex_redis_check( // Valid data present in redis let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); logger::debug!("forex_log: forex response found in redis"); - save_forex_to_local(exchange_rates.clone()).await?; + save_forex_data_to_local_cache(exchange_rates.clone()).await?; Ok(exchange_rates) } None => { // redis expired - successive_fetch_and_save_forex(state, Some(redis_data)).await + call_forex_api_and_save_data_to_cache_and_redis(state, Some(redis_data)).await } } } -async fn fetch_forex_rates( +async fn fetch_forex_rates_from_primary_api( state: &SessionState, ) -> Result> { let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); @@ -345,7 +345,7 @@ async fn fetch_forex_rates( ))) } -pub async fn fallback_fetch_forex_rates( +pub async fn fetch_forex_rates_from_fallback_api( state: &SessionState, ) -> CustomResult { let fallback_forex_api_key = state.conf.forex_api.get_inner().fallback_api_key.peek(); @@ -423,7 +423,7 @@ pub async fn fallback_fetch_forex_rates( FxExchangeRatesCacheEntry::new(ExchangeRates::new(enums::Currency::USD, conversions)); match acquire_redis_lock(state).await { Ok(_) => { - successive_save_data_to_redis_local(state, rates.clone()).await?; + save_forex_data_to_cache_and_redis(state, rates.clone()).await?; Ok(rates) } Err(e) => Err(e), @@ -465,7 +465,7 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult CustomResult<(), ForexCacheError> { @@ -480,7 +480,7 @@ async fn save_forex_to_redis( .attach_printable("Unable to save forex data to redis") } -async fn retrieve_forex_from_redis( +async fn retrieve_forex_data_from_redis( app_state: &SessionState, ) -> CustomResult, ForexCacheError> { logger::debug!("forex_log: Retrieving forex from redis"); From 0e9034af214356d4e6ae56a90b52d363149f4587 Mon Sep 17 00:00:00 2001 From: prajjwalkumar17 Date: Thu, 16 Jan 2025 20:38:18 +0530 Subject: [PATCH 15/15] rename functions --- crates/router/src/utils/currency.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index b3bec1d6b3f4..c10ebf3dac18 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -171,16 +171,16 @@ pub async fn get_forex_rates( } } else { // No data in local - check_local_cache_expiry_and_call_api(state, call_delay).await + call_api_if_redis_forex_data_expired(state, call_delay).await } } -async fn check_local_cache_expiry_and_call_api( +async fn call_api_if_redis_forex_data_expired( state: &SessionState, call_delay: i64, ) -> CustomResult { match retrieve_forex_data_from_redis(state).await { - Ok(Some(data)) => check_redis_data_expiry_and_call_api(state, data, call_delay).await, + Ok(Some(data)) => call_forex_api_if_redis_data_expired(state, data, call_delay).await, Ok(None) => { // No data in local as well as redis call_forex_api_and_save_data_to_cache_and_redis(state, None).await?; @@ -259,7 +259,7 @@ async fn save_forex_data_to_cache_and_redis( .await } -async fn check_redis_data_expiry_and_call_api( +async fn call_forex_api_if_redis_data_expired( state: &SessionState, redis_data: FxExchangeRatesCacheEntry, call_delay: i64,