diff --git a/Makefile b/Makefile index 4d4b3e8c..17e7d4e3 100644 --- a/Makefile +++ b/Makefile @@ -83,13 +83,13 @@ ${CORE_DIST}: ${WASI_SDK_FOLDER} ${CORE_JS_ASSETS_MAP_STD} ${CORE_JS_ASSETS_PROF touch ${CORE_DIST} ${CORE_WASM}: ${CORE_DIST} - cd core; cargo build --package oneclient_core --target wasm32-wasi ${CARGO_FLAGS} + cd core; cargo build --package oneclient_core_wasm --target wasm32-wasi ${CARGO_FLAGS} @echo 'Optimizing wasm...' - wasm-opt -Oz ${WASM_OPT_FLAGS} core/target/wasm32-wasi/${CARGO_PROFILE}/oneclient_core.wasm --output ${CORE_WASM} + wasm-opt -Oz ${WASM_OPT_FLAGS} core/target/wasm32-wasi/${CARGO_PROFILE}/oneclient_core_wasm.wasm --output ${CORE_WASM} ${TEST_CORE_WASM}: ${CORE_DIST} - cd core; cargo build --package oneclient_core --target wasm32-wasi --features "core_mock" ${CARGO_FLAGS} - cp core/target/wasm32-wasi/${CARGO_PROFILE}/oneclient_core.wasm ${TEST_CORE_WASM} + cd core; cargo build --package oneclient_core_wasm --target wasm32-wasi --features "core_mock" ${CARGO_FLAGS} + cp core/target/wasm32-wasi/${CARGO_PROFILE}/oneclient_core_wasm.wasm ${TEST_CORE_WASM} ${CORE_ASYNCIFY_WASM}: ${CORE_WASM} @echo 'Running asyncify...' diff --git a/core/Cargo.lock b/core/Cargo.lock index 81695387..2377c20b 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1208,6 +1208,15 @@ dependencies = [ "url", ] +[[package]] +name = "oneclient_core_wasm" +version = "0.1.0" +dependencies = [ + "oneclient_core", + "tracing", + "wasm_abi", +] + [[package]] name = "openssl" version = "0.10.60" diff --git a/core/Cargo.toml b/core/Cargo.toml index 1340da9c..b64f55c5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["comlink", "comlink_wasm", "core", "core_to_map_std", "host_to_core_std", "interpreter_js", "json_schemas", "wasm_abi"] +members = ["comlink", "comlink_wasm", "core", "core_wasm", "core_to_map_std", "host_to_core_std", "interpreter_js", "json_schemas", "wasm_abi"] [workspace.dependencies] base64 = { version = "0.21" } diff --git a/core/core/Cargo.toml b/core/core/Cargo.toml index dad625f4..eddb25a1 100644 --- a/core/core/Cargo.toml +++ b/core/core/Cargo.toml @@ -3,12 +3,7 @@ name = "oneclient_core" version = "0.1.0" edition = "2021" -[lib] -crate-type = ["cdylib"] - [features] -default = ["asyncify"] -asyncify = [] core_mock = [] [dependencies] diff --git a/core/core/src/sf_core/cache.rs b/core/core/src/cache.rs similarity index 82% rename from core/core/src/sf_core/cache.rs rename to core/core/src/cache.rs index 698ed903..a230eb15 100644 --- a/core/core/src/sf_core/cache.rs +++ b/core/core/src/cache.rs @@ -1,17 +1,17 @@ use std::{ collections::HashMap, io::Read, - time::{Duration, Instant}, + time::{Duration, Instant}, ops::Deref, }; use url::Url; use sf_std::{ - unstable::{http::HttpCallError, provider::ProviderJson}, - HeaderName, HeadersMultiMap, + unstable::{http::{HttpCallError, HttpRequest}, provider::ProviderJson, fs::read_in}, + HeaderName, HeadersMultiMap, abi::{MessageExchange, StreamExchange}, }; -use super::{digest, Fs, HttpRequest}; +use super::digest; #[derive(Debug, thiserror::Error)] pub enum ProfileCacheEntryError { @@ -104,20 +104,30 @@ impl std::fmt::Debug for DocumentCacheEntry { } } -pub struct DocumentCache { +pub struct DocumentCache { + message_exchange: Me, + stream_exchange: Se, map: HashMap>, cache_duration: Duration, registry_url: Url, user_agent: Option, } -impl DocumentCache { +impl DocumentCache { const FILE_URL_PREFIX: &'static str = "file://"; const HTTP_URL_PREFIX: &'static str = "http://"; const HTTPS_URL_PREFIX: &'static str = "https://"; const BASE64_URL_PREFIX: &'static str = "data:;base64,"; - pub fn new(cache_duration: Duration, registry_url: Url, user_agent: Option) -> Self { + pub fn new( + cache_duration: Duration, + registry_url: Url, + user_agent: Option, + message_exchange: Me, + stream_exchange: Se + ) -> Self { Self { + message_exchange, + stream_exchange, map: HashMap::new(), cache_duration, registry_url, @@ -125,8 +135,8 @@ impl DocumentCache { } } - pub fn get(&self, url: &str) -> Option<&E> { - self.map.get(url).map(|e| &e.data) + pub fn get_key_value(&self, url: &str) -> Option<(&str, &E)> { + self.map.get_key_value(url).map(|(k, e)| (k.deref(), &e.data)) } pub fn cache( @@ -149,12 +159,12 @@ impl DocumentCache { } let data = match url { - url if url.starts_with(Self::FILE_URL_PREFIX) => Self::cache_file(url), + url if url.starts_with(Self::FILE_URL_PREFIX) => self.cache_file(url), url if url.starts_with("data:;base64,") => Self::cache_base64(url), url => { if url.starts_with(Self::HTTP_URL_PREFIX) || url.starts_with(Self::HTTPS_URL_PREFIX) { - Self::cache_http(url, self.user_agent.as_deref()) + self.cache_http(url, self.user_agent.as_deref()) } else { let file = format!("{}.js", url); let full_url = self.registry_url.join(&file).map_err(|_e| { @@ -164,7 +174,7 @@ impl DocumentCache { ) })?; - Self::cache_http(full_url.as_str(), self.user_agent.as_deref()) + self.cache_http(full_url.as_str(), self.user_agent.as_deref()) } } }?; @@ -187,28 +197,30 @@ impl DocumentCache { } fn cache_file( - url: &str, + &self, + url: &str ) -> Result, DocumentCacheError> { match url.strip_prefix(Self::FILE_URL_PREFIX) { None => Err(DocumentCacheError::FileLoadFailed( url.to_string(), std::io::ErrorKind::NotFound.into(), )), - Some(path) => Fs::read(path) + Some(path) => read_in(path, &self.message_exchange, &self.stream_exchange) .map_err(|err| DocumentCacheError::FileLoadFailed(path.to_string(), err)), } } fn cache_http( + &self, url: &str, - user_agent: Option<&str>, + user_agent: Option<&str> ) -> Result, DocumentCacheError> { let mut headers = HeadersMultiMap::new(); if let Some(user_agent) = user_agent { headers.insert(HeaderName::from("user-agent"), vec![user_agent.to_string()]); } - let mut response = HttpRequest::fetch("GET", url, &headers, &Default::default(), None) + let mut response = HttpRequest::fetch_in("GET", url, &headers, &Default::default(), None, &self.message_exchange, &self.stream_exchange) .and_then(|v| v.into_response()) .map_err(|err| DocumentCacheError::HttpLoadFailed(url.to_string(), err))?; @@ -235,7 +247,7 @@ impl DocumentCache { } } } -impl std::fmt::Debug for DocumentCache { +impl std::fmt::Debug for DocumentCache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DocumentCache") .field("map", &self.map) diff --git a/core/core/src/sf_core/config.rs b/core/core/src/config.rs similarity index 100% rename from core/core/src/sf_core/config.rs rename to core/core/src/config.rs diff --git a/core/core/src/sf_core/digest.rs b/core/core/src/digest.rs similarity index 100% rename from core/core/src/sf_core/digest.rs rename to core/core/src/digest.rs diff --git a/core/core/src/sf_core/exception.rs b/core/core/src/exception.rs similarity index 100% rename from core/core/src/sf_core/exception.rs rename to core/core/src/exception.rs diff --git a/core/core/src/lib.rs b/core/core/src/lib.rs index e15babd1..8f1edc30 100644 --- a/core/core/src/lib.rs +++ b/core/core/src/lib.rs @@ -1,144 +1,327 @@ -use std::sync::Mutex; +use std::{collections::BTreeMap, str::FromStr}; -use bindings::MessageExchangeFfi; +use sf_std::{unstable::{ + exception::PerformException, + perform::{PerformInput, set_perform_output_result_in, set_perform_output_error_in, set_perform_output_exception_in}, + HostValue, +}, abi::{MessageExchange, StreamExchange}}; -use sf_std::{ - abi::{Ptr, Size}, - unstable::perform::{ - set_perform_output_error_in, set_perform_output_exception_in, set_perform_output_result_in, - }, +use interpreter_js::JsInterpreter; +use map_std::unstable::{ + security::{prepare_provider_parameters, prepare_security_map}, + services::prepare_services_map, + MapValue, MapValueObject, }; +use comlink::json_schema_validator::JsonSchemaValidator; -mod bindings; -mod observability; -mod sf_core; -use sf_core::{CoreConfiguration, OneClientCore}; +use crate::{ + exception::FromJsonSchemaValidationError, + metrics::PerformMetricsData, +}; + +mod cache; +mod config; +mod digest; +mod exception; +mod map_std_impl; +mod metrics; +pub mod observability; #[cfg(feature = "core_mock")] mod mock; -static GLOBAL_STATE: Mutex> = Mutex::new(None); +// use crate::profile_validator::ProfileValidator; +use cache::DocumentCache; +pub use config::CoreConfiguration; +use map_std_impl::MapStdImpl; + +use self::{ + cache::{MapCacheEntry, ProfileCacheEntry, ProviderJsonCacheEntry}, + map_std_impl::MapStdImplConfig, +}; -// WASI functions which would be automatically called from `_start`, but we need to explicitly call them since we are a lib. -extern "C" { - fn __wasm_call_ctors(); - fn __wasm_call_dtors(); +#[derive(Debug)] +pub struct OneClientCore { + message_exchange: Me, + stream_exchange: Se, + profile_cache: DocumentCache, + provider_cache: DocumentCache, + map_cache: DocumentCache, + security_validator: JsonSchemaValidator, + parameters_validator: JsonSchemaValidator, + mapstd_config: MapStdImplConfig, } +impl OneClientCore { + const MAP_STDLIB_JS: &'static str = include_str!("../assets/js/map_std.js"); + const SECURITY_VALUES_JSON_SCHEMA: &'static str = + include_str!("../assets/schemas/security_values.json"); + const PARAMETERS_VALUES_JSON_SCHEMA: &'static str = + include_str!("../assets/schemas/parameters_values.json"); -#[no_mangle] -#[export_name = "oneclient_core_setup"] -#[cfg_attr(feature = "core_mock", allow(unreachable_code))] -/// Initializes persistent core state. -/// -/// This function must not be called twice without calling teardown in between. -pub extern "C" fn __export_oneclient_core_setup() { - #[cfg(feature = "core_mock")] - return mock::__export_oneclient_core_setup(); - - // call ctors first - unsafe { __wasm_call_ctors() }; - - let mut lock = GLOBAL_STATE.lock().unwrap(); - if lock.is_some() { - panic!("Already setup"); - } + // TODO: Use thiserror and define specific errors + pub fn new(config: &CoreConfiguration, message_exchange: Me, stream_exchange: Se) -> anyhow::Result { + tracing::info!(target: "@user", config = ?config); - // load config, but don't display the error yet since we haven't initialize logging yet - let (config, config_err) = match CoreConfiguration::from_env() { - Ok(c) => (c, None), - Err(err) => (CoreConfiguration::default(), Some(err)), - }; - - // initialize observability - // SAFETY: setup is only allowed to be called once - unsafe { observability::init(&config) }; - - // now that we have logging we can start printing stuff - tracing::debug!(target: "@user", "oneclient_core_setup called"); - if let Some(err) = config_err { - tracing::error!( - target: "@user", - "Failed to load core configuration from environment: {}", - err - ); - } + crate::observability::log_metric!(Init); - // here we panic on error because there is nothing to teardown - lock.replace(OneClientCore::new(&config).unwrap()); -} + Ok(Self { + profile_cache: DocumentCache::new( + config.cache_duration, + config.registry_url.clone(), + Some(config.user_agent.clone()), + message_exchange.clone(), + stream_exchange.clone() + ), + provider_cache: DocumentCache::new( + config.cache_duration, + config.registry_url.clone(), + Some(config.user_agent.clone()), + message_exchange.clone(), + stream_exchange.clone() + ), + map_cache: DocumentCache::new( + config.cache_duration, + config.registry_url.clone(), + Some(config.user_agent.clone()), + message_exchange.clone(), + stream_exchange.clone() + ), + security_validator: JsonSchemaValidator::new( + &serde_json::Value::from_str(Self::SECURITY_VALUES_JSON_SCHEMA) + .expect("Valid JSON"), + ) + .expect("Valid JSON Schema for security values exists"), + parameters_validator: JsonSchemaValidator::new( + &serde_json::Value::from_str(Self::PARAMETERS_VALUES_JSON_SCHEMA) + .expect("Valid JSON"), + ) + .expect("Valid JSON Schema for parameters values exists"), + mapstd_config: MapStdImplConfig { + log_http_transactions: config.user_log, + log_http_transactions_body_max_size: config.user_log_http_body_max_size, + user_agent: config.user_agent.clone(), + }, + message_exchange, + stream_exchange, + }) + } -#[no_mangle] -#[export_name = "oneclient_core_teardown"] -#[cfg_attr(feature = "core_mock", allow(unreachable_code))] -/// Tears down persistent core state. -/// -/// This function must be called exactly once after calling core setup. -pub extern "C" fn __export_oneclient_core_teardown() { - #[cfg(feature = "core_mock")] - return mock::__export_oneclient_core_teardown(); - - tracing::debug!(target: "@user", "oneclient_core_teardown called"); - - match GLOBAL_STATE.try_lock() { - Err(_) => panic!("Global state lock already locked: perform most likely panicked"), - Ok(lock) if lock.is_none() => panic!("Not setup or already torn down"), - Ok(mut lock) => { - let state = lock.take(); - std::mem::drop(state); // just to be explicit, would be dropped implicitly anyway - - // call dtors last - unsafe { __wasm_call_dtors() }; + /// Converts HostValue into MapValue. + /// + /// For primitive types this is a simple move. For custom types with drop code this might include adding + /// reference counting and registering handles. + fn host_value_to_map_value(&mut self, value: HostValue) -> MapValue { + match value { + HostValue::Stream(_) => todo!(), + HostValue::None => MapValue::None, + HostValue::Bool(b) => MapValue::Bool(b), + HostValue::Number(n) => MapValue::Number(n), + HostValue::String(s) => MapValue::String(s), + HostValue::Array(a) => MapValue::Array( + a.into_iter() + .map(|v| self.host_value_to_map_value(v)) + .collect(), + ), + HostValue::Object(o) => MapValue::Object(BTreeMap::from_iter( + o.into_iter() + .map(|(k, v)| (k, self.host_value_to_map_value(v))), + )), } } -} -#[no_mangle] -#[export_name = "oneclient_core_perform"] -#[cfg_attr(feature = "core_mock", allow(unreachable_code))] -/// Executes perform. -/// -/// Must be called after [__export_oneclient_core_setup] and before [__export_oneclient_core_teardown]. -/// -/// All information about map to be performed will be retrieved through messages. -pub extern "C" fn __export_oneclient_core_perform() { - #[cfg(feature = "core_mock")] - return mock::__export_oneclient_core_perform(); - - let mut lock = GLOBAL_STATE.lock().unwrap(); - let state: &mut OneClientCore = lock - .as_mut() - .expect("Global state missing: has oneclient_core_setup been called?"); - - match state.perform() { - Ok(Ok(result)) => set_perform_output_result_in(result, MessageExchangeFfi), - Ok(Err(error)) => set_perform_output_error_in(error, MessageExchangeFfi), - Err(exception) => { - tracing::error!(target: "@user", "Perform failed unexpectedly: {}", exception); - - set_perform_output_exception_in(exception, MessageExchangeFfi) + /// Converts MapValue into HostValue. + /// + /// This is the opposite action to [host_value_to_map_value]. + fn map_value_to_host_value(&mut self, value: MapValue) -> HostValue { + match value { + MapValue::None => HostValue::None, + MapValue::Bool(b) => HostValue::Bool(b), + MapValue::Number(n) => HostValue::Number(n), + MapValue::String(s) => HostValue::String(s), + MapValue::Array(a) => HostValue::Array( + a.into_iter() + .map(|v| self.map_value_to_host_value(v)) + .collect(), + ), + MapValue::Object(o) => HostValue::Object(BTreeMap::from_iter( + o.into_iter() + .map(|(k, v)| (k, self.map_value_to_host_value(v))), + )), } } -} -#[cfg(feature = "asyncify")] -#[no_mangle] -#[export_name = "asyncify_alloc_stack"] -pub extern "C" fn __export_oneclient_core_async_init(mut data_ptr: Ptr, stack_size: Size) { - // We allocate Size elements to ensure correct alignment, but size is in bytes. - let len = stack_size / std::mem::size_of::(); - - let mut asyncify_stack = Vec::::new(); - asyncify_stack.reserve_exact(len); - asyncify_stack.resize(len, 0); - // leak the stack so deallocation doesn't happen - let asyncify_stack = asyncify_stack.leak(); - - // part of the data contract is that we write the resulting range to the data struct ourselves - let stack = asyncify_stack.as_mut_ptr_range(); - unsafe { - data_ptr.mut_ptr().write(stack.start as Size); - data_ptr.mut_ptr().offset(1).write(stack.end as Size) + #[cfg_attr(feature = "core_mock", allow(unreachable_code))] + pub fn perform(&mut self) { + #[cfg(feature = "core_mock")] + { + return crate::mock::perform(self.message_exchange.clone()); + } + + // we can't send metrics if we don't even know the profile and provider urls + let perform_input = match PerformInput::take_in(&self.message_exchange) { + Ok(i) => i, + Err(exception) => { + set_perform_output_exception_in(exception.into(), &self.message_exchange); + return + } + }; + + // information we have so far parsed from the available data, might be partial if an exception happens + let mut metrics_data = Default::default(); + + match self.perform_impl(perform_input, &mut metrics_data) { + Ok(map_result) => { + tracing::debug!(perform_metrics = ?metrics_data); + crate::observability::log_metric!( + Perform + success = true, + profile = metrics_data.get_profile().as_ref(), + profile_url = metrics_data.profile_url, + profile_content_hash = metrics_data.profile_content_hash, + provider = metrics_data.get_provider(), + provider_url = metrics_data.provider_url, + provider_content_hash = metrics_data.provider_content_hash, + map_url = metrics_data.map_url, + map_content_hash = metrics_data.map_content_hash + ); + + match map_result { + Ok(result) => set_perform_output_result_in(self.map_value_to_host_value(result), &self.message_exchange), + Err(error) => set_perform_output_error_in(self.map_value_to_host_value(error), &self.message_exchange) + } + } + Err(exception) => { + tracing::error!(target: "@user", "Perform failed unexpectedly: {}", exception); + + tracing::debug!(perform_metrics = ?metrics_data); + crate::observability::log_metric!( + Perform + success = false, + profile = metrics_data.get_profile().as_ref(), + profile_url = metrics_data.profile_url, + profile_content_hash = metrics_data.profile_content_hash, + provider = metrics_data.get_provider(), + provider_url = metrics_data.provider_url, + provider_content_hash = metrics_data.provider_content_hash, + map_url = metrics_data.map_url, + map_content_hash = metrics_data.map_content_hash + ); + + set_perform_output_exception_in(exception, &self.message_exchange) + } + } } - // TODO: if we allocate the stack so that the data structure is at its beginning we get the possibility of having multiple stacks without relying on undocumented compiler behavior + fn perform_impl<'metrics, 'me: 'metrics>(&'me mut self, perform_input: PerformInput, metrics_data: &'metrics mut PerformMetricsData<'me>) -> Result, PerformException> { + // first cache documents + self.profile_cache.cache(&perform_input.profile_url, ProfileCacheEntry::from_data)?; + self.provider_cache.cache(&perform_input.provider_url, ProviderJsonCacheEntry::from_data)?; + self.map_cache.cache(&perform_input.map_url, |data| { + // TODO: this is temporary, should be extracted from the map manifest + let file_name = perform_input.map_url.split('/').last().unwrap().to_string(); + + MapCacheEntry::new(data, file_name) + })?; + + // process map input and parameters + let map_input = self.host_value_to_map_value(perform_input.map_input); + // TODO: Validate Input + // let mut profile_validator = ProfileValidator::new( + // std::str::from_utf8( + // self.document_cache + // .get(&perform_input.profile_url) + // .unwrap() + // .data + // .as_slice(), + // ) + // .unwrap() + // .to_string(), + // perform_input.usecase.clone(), + // ) + // .context("Failed to initialize profile validator")?; + // if let Err(err) = profile_validator.validate_input(map_input.clone()) { + // tracing::error!("Input validation error: {}", err); + // } + + // Validate parameters values against json schema + self.parameters_validator + .validate(&(&perform_input.map_parameters).into()) + .map_err(|err| { + PerformException::from_json_schema_validation_error( + err, + Some("Parameters".to_string().as_ref()), + ) + })?; + + let mut map_parameters = match perform_input.map_parameters { + HostValue::Object(o) => MapValueObject::from_iter( + o.into_iter() + .map(|(k, v)| (k, self.host_value_to_map_value(v))), + ), + HostValue::None => MapValueObject::new(), + _ => unreachable!("Object or None ensured with JSON Schema validation"), + }; + + // Validate security values against json schema + self.security_validator + .validate(&(&perform_input.map_security).into()) + .map_err(|err| { + PerformException::from_json_schema_validation_error( + err, + Some("Security".to_string().as_ref()), + ) + })?; + + // parse provider json + let (provider_url, provider_entry) = self.provider_cache.get_key_value(&perform_input.provider_url).unwrap(); + // TODO: validate provider json with json schema, to verify OneClient will understand it? + + metrics_data.provider_url = provider_url; + metrics_data.provider_content_hash = Some(&provider_entry.content_hash); + metrics_data.provider = Some(&provider_entry.provider_json.name); + + // process provider and combine with inputs + let mut provider_parameters = prepare_provider_parameters(&provider_entry.provider_json); + provider_parameters.append(&mut map_parameters); + let map_parameters = provider_parameters; + let map_security = prepare_security_map( + &provider_entry.provider_json, + &perform_input.map_security + )?; + let map_services = prepare_services_map(&provider_entry.provider_json, &map_parameters)?; + + let (profile_url, profile_entry) = self.profile_cache.get_key_value(&perform_input.profile_url).unwrap(); + metrics_data.profile_url = profile_url; + metrics_data.profile_content_hash = Some(&profile_entry.content_hash); + + // start interpreting stdlib and then map code + // TODO: should this be here or should we hold an instance of the interpreter in global state + // and clear per-perform data each time it is called? + let mut interpreter = JsInterpreter::new(MapStdImpl::new( + self.mapstd_config.to_owned(), + self.message_exchange.clone(), + self.stream_exchange.clone() + ))?; + interpreter.eval_code("map_std.js", Self::MAP_STDLIB_JS)?; + + let (map_url, map_entry) = self.map_cache.get_key_value(&perform_input.map_url).unwrap(); + metrics_data.map_url = map_url; + metrics_data.map_content_hash = Some(&map_entry.content_hash); + + let map_result = { + interpreter.state_mut().set_context( + map_std::map_value!({ + "input": map_input, + "parameters": MapValue::Object(map_parameters), + "services": map_services + }), + Some(map_security), + ); + interpreter.run(&map_entry.file_name, &map_entry.map, &perform_input.usecase)?; + + interpreter.state_mut().take_output().unwrap() + }; + + Ok(map_result) + } } diff --git a/core/core/src/sf_core/map_std_impl/mod.rs b/core/core/src/map_std_impl/mod.rs similarity index 87% rename from core/core/src/sf_core/map_std_impl/mod.rs rename to core/core/src/map_std_impl/mod.rs index a892d2a2..594ae926 100644 --- a/core/core/src/sf_core/map_std_impl/mod.rs +++ b/core/core/src/map_std_impl/mod.rs @@ -11,15 +11,13 @@ use map_std::{ MapStdFull, }; use sf_std::{ - abi::Handle, + abi::{Handle, MessageExchange, StreamExchange}, fmt::{HttpRequestFmt, HttpResponseFmt}, - HeaderName, + HeaderName, unstable::http::HttpRequest, }; use self::stream::PeekableStream; -use super::HttpRequest; - mod stream; #[derive(Debug, Clone)] @@ -32,17 +30,21 @@ pub struct MapStdImplConfig { pub user_agent: String, } -pub struct MapStdImpl { - http_requests: HandleMap, - streams: HandleMap, +pub struct MapStdImpl { + message_exchange: Me, + stream_exchange: Se, + http_requests: HandleMap>, + streams: HandleMap>, security: Option, map_context: Option, map_output: Option>, config: MapStdImplConfig, } -impl MapStdImpl { - pub fn new(config: MapStdImplConfig) -> Self { +impl MapStdImpl { + pub fn new(config: MapStdImplConfig, message_exchange: Me, stream_exchange: Se) -> Self { Self { + message_exchange, + stream_exchange, http_requests: HandleMap::new(), streams: HandleMap::new(), security: None, @@ -64,7 +66,7 @@ impl MapStdImpl { self.map_output.take() } } -impl MapStdUnstable for MapStdImpl { +impl MapStdUnstable for MapStdImpl { fn print(&mut self, message: &str) { tracing::info!(target: "@user", map = %message); } @@ -105,12 +107,14 @@ impl MapStdUnstable for MapStdImpl { // We want to log the transaction below together with the handle, but we want to log it even if it fails // in which case it doesn't get a handle, so we play around with a result here - let handle_result = HttpRequest::fetch( + let handle_result = HttpRequest::fetch_in( ¶ms.method, ¶ms.url, ¶ms.headers, ¶ms.query, params.body.as_deref(), + self.message_exchange.clone(), + self.stream_exchange.clone() ) .map(|request| self.http_requests.insert(request)) .map_err(MapHttpCallError::from); @@ -196,4 +200,4 @@ impl MapStdUnstable for MapStdImpl { Ok(()) } } -impl MapStdFull for MapStdImpl {} +impl MapStdFull for MapStdImpl {} diff --git a/core/core/src/sf_core/map_std_impl/stream.rs b/core/core/src/map_std_impl/stream.rs similarity index 74% rename from core/core/src/sf_core/map_std_impl/stream.rs rename to core/core/src/map_std_impl/stream.rs index 4dffa87c..b9c71fd2 100644 --- a/core/core/src/sf_core/map_std_impl/stream.rs +++ b/core/core/src/map_std_impl/stream.rs @@ -1,28 +1,28 @@ use std::io::{Read, Write}; -use crate::sf_core::{HttpResponse, IoStream}; +use sf_std::{unstable::{IoStream, http::HttpResponse}, abi::StreamExchange}; -pub enum StreamEntry { - Io(IoStream), +pub enum StreamEntry { + Io(IoStream), /// Buffered streams are currently implemented for body logging - Peekable(PeekableStream), + Peekable(PeekableStream), } -impl From for StreamEntry { - fn from(value: IoStream) -> Self { +impl From> for StreamEntry { + fn from(value: IoStream) -> Self { Self::Io(value) } } -impl From for StreamEntry { - fn from(value: PeekableStream) -> Self { +impl From> for StreamEntry { + fn from(value: PeekableStream) -> Self { Self::Peekable(value) } } -impl From for StreamEntry { - fn from(value: HttpResponse) -> Self { +impl From> for StreamEntry { + fn from(value: HttpResponse) -> Self { Self::Io(value.into_body()) } } -impl Read for StreamEntry { +impl Read for StreamEntry { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { Self::Io(i) => i.read(buf), @@ -30,7 +30,7 @@ impl Read for StreamEntry { } } } -impl Write for StreamEntry { +impl Write for StreamEntry { fn write(&mut self, buf: &[u8]) -> std::io::Result { match self { Self::Io(i) => i.write(buf), @@ -46,11 +46,11 @@ impl Write for StreamEntry { } } -pub struct PeekableStream { +pub struct PeekableStream { buffer: Vec, - inner: IoStream, + inner: IoStream, } -impl PeekableStream { +impl PeekableStream { pub fn peek(&mut self, count: usize) -> std::io::Result<&[u8]> { let count = if self.buffer.len() < count { // calculate how many bytes are needed to fill buffer up to `count` @@ -81,15 +81,15 @@ impl PeekableStream { Ok(&self.buffer[..count]) } } -impl From for PeekableStream { - fn from(value: IoStream) -> Self { +impl From> for PeekableStream { + fn from(value: IoStream) -> Self { PeekableStream { buffer: Vec::new(), inner: value, } } } -impl Read for PeekableStream { +impl Read for PeekableStream { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { if self.buffer.len() > 0 { // calculate how much to read from the buffer @@ -105,7 +105,7 @@ impl Read for PeekableStream { self.inner.read(buf) } } -impl Write for PeekableStream { +impl Write for PeekableStream { fn write(&mut self, buf: &[u8]) -> std::io::Result { // TODO: we probably don't care about this self.inner.write(buf) diff --git a/core/core/src/sf_core/metrics.rs b/core/core/src/metrics.rs similarity index 100% rename from core/core/src/sf_core/metrics.rs rename to core/core/src/metrics.rs diff --git a/core/core/src/mock.rs b/core/core/src/mock.rs index 8df0a1d4..5a48c3b7 100644 --- a/core/core/src/mock.rs +++ b/core/core/src/mock.rs @@ -5,23 +5,17 @@ //! - CORE_PERFORM_TRUE //! - CORE_PERFORM_INPUT_VALIDATION_ERROR -use sf_std::unstable::{ - exception::{PerformException, PerformExceptionErrorCode}, - perform::{set_perform_output_exception_in, set_perform_output_result_in, PerformInput}, - HostValue, +use sf_std::{ + abi::MessageExchange, + unstable::{ + exception::{PerformException, PerformExceptionErrorCode}, + perform::{set_perform_output_exception_in, set_perform_output_result_in, PerformInput}, + HostValue, + }, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use crate::bindings::MessageExchangeFfi; - -extern "C" { - fn __wasm_call_ctors(); - fn __wasm_call_dtors(); -} - -pub fn __export_oneclient_core_setup() { - unsafe { __wasm_call_ctors() }; - +pub fn init_tracing() { // initialize tracing tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) @@ -30,27 +24,22 @@ pub fn __export_oneclient_core_setup() { tracing::debug!("mocked oneclient core setup"); } -pub fn __export_oneclient_core_teardown() { - tracing::debug!("mocked oneclient core teardown"); - unsafe { __wasm_call_dtors() }; -} - -pub fn __export_oneclient_core_perform() { - let perform_input = PerformInput::take_in(MessageExchangeFfi).unwrap(); +pub fn perform(message_exchange: impl MessageExchange) { + let perform_input = PerformInput::take_in(&message_exchange).unwrap(); tracing::debug!("mocked oneclient core perform {}", perform_input.usecase); match perform_input.usecase.as_str() { "CORE_PERFORM_PANIC" => panic!("Requested panic!"), "CORE_PERFORM_TRUE" => { - set_perform_output_result_in(HostValue::Bool(true), MessageExchangeFfi) + set_perform_output_result_in(HostValue::Bool(true), &message_exchange) } "CORE_PERFORM_INPUT_VALIDATION_ERROR" => set_perform_output_exception_in( PerformException { error_code: PerformExceptionErrorCode::InputValidationError, message: "Test validation error".to_string(), }, - MessageExchangeFfi, + &message_exchange, ), _ => panic!("Unknown usecase: {}", perform_input.usecase), }; diff --git a/core/core/src/observability/mod.rs b/core/core/src/observability/mod.rs index a8c59642..f506f35c 100644 --- a/core/core/src/observability/mod.rs +++ b/core/core/src/observability/mod.rs @@ -1,35 +1,44 @@ -use std::{borrow::Cow, ops::Deref}; +use std::borrow::Cow; -use sf_std::abi::{Ptr, Size}; -use tracing::metadata::LevelFilter; -use tracing_subscriber::{ - filter::FilterFn, fmt::format, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, -}; +use tracing_subscriber::{fmt::format, filter::{FilterFn, LevelFilter}, Layer, layer::SubscriberExt, EnvFilter, util::SubscriberInitExt}; -use self::buffer::{RingEventBuffer, SharedEventBuffer, TracingEventBuffer, VecEventBuffer}; -use crate::sf_core::CoreConfiguration; +use crate::CoreConfiguration; +use self::buffer::{SharedEventBuffer, VecEventBuffer, RingEventBuffer}; +pub(crate) use metrics::log_metric; -mod buffer; +pub mod buffer; pub mod metrics; -static mut METRICS_BUFFER: Option> = None; -static mut DEVELOPER_DUMP_BUFFER: Option> = None; +pub static mut METRICS_BUFFER: Option> = None; +pub static mut DEVELOPER_DUMP_BUFFER: Option> = None; +#[cfg_attr(feature = "core_mock", allow(unused_variables))] /// SAFETY: must only be called once during initialization of the program pub unsafe fn init(config: &CoreConfiguration) { - // SAFETY: this is only called once and there is no asynchronous mutation unsafe { + // metrics buffer is not used through tracing METRICS_BUFFER.replace(SharedEventBuffer::new(VecEventBuffer::new())); - DEVELOPER_DUMP_BUFFER.replace(SharedEventBuffer::new(RingEventBuffer::new( - config.developer_dump_buffer_size, - ))); - - init_tracing( - // METRICS_BUFFER.as_ref().cloned().unwrap(), - DEVELOPER_DUMP_BUFFER.as_ref().cloned().unwrap(), - config.user_log, - &config.developer_log, - ); + } + + #[cfg(feature = "core_mock")] + { + crate::mock::init_tracing(); + } + #[cfg(not(feature = "core_mock"))] + { + // SAFETY: this is only called once and there is no asynchronous mutation + unsafe { + DEVELOPER_DUMP_BUFFER.replace(SharedEventBuffer::new(RingEventBuffer::new( + config.developer_dump_buffer_size, + ))); + + init_tracing( + // METRICS_BUFFER.as_ref().cloned().unwrap(), + DEVELOPER_DUMP_BUFFER.as_ref().cloned().unwrap(), + config.user_log, + &config.developer_log, + ); + } } // add panic hook so we can log panics as metrics @@ -42,7 +51,7 @@ pub unsafe fn init(config: &CoreConfiguration) { format!("{}", info).into() }; - metrics::log_metric!( + log_metric!( Panic message = message.as_ref(), location = info.location().map(|l| (l.file(), l.line(), l.column())) @@ -55,6 +64,7 @@ pub unsafe fn init(config: &CoreConfiguration) { })); } +#[cfg_attr(feature = "core_mock", allow(dead_code))] fn init_tracing( // TODO: we don't use tracing to store metrics in the metrics buffer because we need more complex fields than tracing currently supports // _metrics_buffer: SharedEventBuffer, @@ -95,90 +105,4 @@ fn init_tracing( .with(developer_layer) .with(developer_dump_layer) .init(); -} - -#[repr(C)] -pub struct FatPointer { - pub ptr: Ptr, - pub size: Size, -} -impl FatPointer { - pub const fn null() -> Self { - Self { - ptr: Ptr::null(), - size: 0, - } - } -} -static mut BUFFER_RETURN_ARENA: [FatPointer; 2] = [FatPointer::null(), FatPointer::null()]; -unsafe fn clear_return_arena() -> Ptr<[FatPointer; 2]> { - unsafe { - BUFFER_RETURN_ARENA[0].ptr = Ptr::null(); - BUFFER_RETURN_ARENA[0].size = 0; - BUFFER_RETURN_ARENA[1].ptr = Ptr::null(); - BUFFER_RETURN_ARENA[1].size = 0; - - Ptr::from((&BUFFER_RETURN_ARENA) as *const [FatPointer; 2]) - } -} -unsafe fn set_return_arena_from(buffer: &impl TracingEventBuffer) -> Ptr<[FatPointer; 2]> { - let [(ptr1, size1), (ptr2, size2)] = buffer.as_raw_parts(); - - unsafe { - BUFFER_RETURN_ARENA[0].ptr = ptr1.into(); - BUFFER_RETURN_ARENA[0].size = size1; - BUFFER_RETURN_ARENA[1].ptr = ptr2.into(); - BUFFER_RETURN_ARENA[1].size = size2; - - Ptr::from((&BUFFER_RETURN_ARENA) as *const [FatPointer; 2]) - } -} - -#[no_mangle] -#[export_name = "oneclient_core_get_metrics"] -/// Returns two fat pointers to memory where metrics are stored. -/// -/// The first one will point to the head of the buffer up to its end. -/// The second one will point from the beginning buffer up to its tail. The second pointer may be null or have zero length. -/// Each metric is a UTF-8 encoded JSON string and is terminated by a null byte. -pub extern "C" fn __export_oneclient_core_get_metrics() -> Ptr<[FatPointer; 2]> { - tracing::debug!("Getting metrics buffer"); - - unsafe { - match METRICS_BUFFER { - Some(ref b) => set_return_arena_from(b.lock().deref()), - None => clear_return_arena(), - } - } -} - -#[no_mangle] -#[export_name = "oneclient_core_clear_metrics"] -/// Clears the metrics buffer. -/// -/// This should be called after [__export_oneclient_core_get_metrics] is called and the metrics are processed. -pub extern "C" fn __export_oneclient_core_clear_metrics() { - tracing::trace!("Clearing metrics buffer"); - - unsafe { - if let Some(ref buffer) = METRICS_BUFFER { - buffer.lock().clear(); - } - } -} - -#[no_mangle] -#[export_name = "oneclient_core_get_developer_dump"] -/// Returns two fat pointer to memory where the developer dump is stored. -/// -/// The first one will point to the head of the buffer up to its end. -/// The second one will point from the beginning buffer up to its tail. The second pointer may be null or have zero length. -/// Each event is a UTF-8 encoded string and is terminated by a null byte. -pub extern "C" fn __export_oneclient_core_get_developer_dump() -> Ptr<[FatPointer; 2]> { - unsafe { - match DEVELOPER_DUMP_BUFFER { - Some(ref b) => set_return_arena_from(b.lock().deref()), - None => clear_return_arena(), - } - } -} +} \ No newline at end of file diff --git a/core/core/src/sf_core.rs b/core/core/src/sf_core.rs deleted file mode 100644 index e82000c9..00000000 --- a/core/core/src/sf_core.rs +++ /dev/null @@ -1,335 +0,0 @@ -use std::{collections::BTreeMap, str::FromStr}; - -use sf_std::unstable::{ - exception::{PerformException, PerformExceptionErrorCode}, - perform::PerformInput, - HostValue, -}; - -use interpreter_js::JsInterpreter; -use map_std::unstable::{ - security::{prepare_provider_parameters, prepare_security_map}, - services::prepare_services_map, - MapValue, MapValueObject, -}; -use comlink::json_schema_validator::JsonSchemaValidator; - -use crate::{ - bindings::{MessageExchangeFfi, StreamExchangeFfi}, - sf_core::{ - exception::FromJsonSchemaValidationError, - metrics::PerformMetricsData, - }, -}; - -mod cache; -mod config; -mod digest; -mod exception; -mod map_std_impl; -mod metrics; -mod profile_validator; - -// use crate::profile_validator::ProfileValidator; -use cache::DocumentCache; -pub use config::CoreConfiguration; -use map_std_impl::MapStdImpl; - -use self::{ - cache::{MapCacheEntry, ProfileCacheEntry, ProviderJsonCacheEntry}, - map_std_impl::MapStdImplConfig, -}; - -type Fs = sf_std::unstable::fs::FsConvenience; -type HttpRequest = sf_std::unstable::http::HttpRequest; -type HttpResponse = sf_std::unstable::http::HttpResponse; -type IoStream = sf_std::unstable::IoStream; - -#[derive(Debug)] -pub struct OneClientCore { - profile_cache: DocumentCache, - provider_cache: DocumentCache, - map_cache: DocumentCache, - security_validator: JsonSchemaValidator, - parameters_validator: JsonSchemaValidator, - mapstd_config: MapStdImplConfig, -} -impl OneClientCore { - const MAP_STDLIB_JS: &'static str = include_str!("../assets/js/map_std.js"); - const SECURITY_VALUES_JSON_SCHEMA: &'static str = - include_str!("../assets/schemas/security_values.json"); - const PARAMETERS_VALUES_JSON_SCHEMA: &'static str = - include_str!("../assets/schemas/parameters_values.json"); - - // TODO: Use thiserror and define specific errors - pub fn new(config: &CoreConfiguration) -> anyhow::Result { - tracing::info!(target: "@user", config = ?config); - - crate::observability::metrics::log_metric!(Init); - - Ok(Self { - profile_cache: DocumentCache::new( - config.cache_duration, - config.registry_url.clone(), - Some(config.user_agent.clone()), - ), - provider_cache: DocumentCache::new( - config.cache_duration, - config.registry_url.clone(), - Some(config.user_agent.clone()), - ), - map_cache: DocumentCache::new( - config.cache_duration, - config.registry_url.clone(), - Some(config.user_agent.clone()), - ), - security_validator: JsonSchemaValidator::new( - &serde_json::Value::from_str(&OneClientCore::SECURITY_VALUES_JSON_SCHEMA) - .expect("Valid JSON"), - ) - .expect("Valid JSON Schema for security values exists"), - parameters_validator: JsonSchemaValidator::new( - &serde_json::Value::from_str(&OneClientCore::PARAMETERS_VALUES_JSON_SCHEMA) - .expect("Valid JSON"), - ) - .expect("Valid JSON Schema for parameters values exists"), - mapstd_config: MapStdImplConfig { - log_http_transactions: config.user_log, - log_http_transactions_body_max_size: config.user_log_http_body_max_size, - user_agent: config.user_agent.clone(), - }, - }) - } - - /// Converts HostValue into MapValue. - /// - /// For primitive types this is a simple move. For custom types with drop code this might include adding - /// reference counting and registering handles. - fn host_value_to_map_value(&mut self, value: HostValue) -> MapValue { - match value { - HostValue::Stream(_) => todo!(), - HostValue::None => MapValue::None, - HostValue::Bool(b) => MapValue::Bool(b), - HostValue::Number(n) => MapValue::Number(n), - HostValue::String(s) => MapValue::String(s), - HostValue::Array(a) => MapValue::Array( - a.into_iter() - .map(|v| self.host_value_to_map_value(v)) - .collect(), - ), - HostValue::Object(o) => MapValue::Object(BTreeMap::from_iter( - o.into_iter() - .map(|(k, v)| (k, self.host_value_to_map_value(v))), - )), - } - } - - /// Converts MapValue into HostValue. - /// - /// This is the opposite action to [host_value_to_map_value]. - fn map_value_to_host_value(&mut self, value: MapValue) -> HostValue { - match value { - MapValue::None => HostValue::None, - MapValue::Bool(b) => HostValue::Bool(b), - MapValue::Number(n) => HostValue::Number(n), - MapValue::String(s) => HostValue::String(s), - MapValue::Array(a) => HostValue::Array( - a.into_iter() - .map(|v| self.map_value_to_host_value(v)) - .collect(), - ), - MapValue::Object(o) => HostValue::Object(BTreeMap::from_iter( - o.into_iter() - .map(|(k, v)| (k, self.map_value_to_host_value(v))), - )), - } - } - - pub fn perform(&mut self) -> Result, PerformException> { - // we can't send metrics if we don't even know the profile and provider urls - let perform_input = PerformInput::take_in(MessageExchangeFfi)?; - - // information we have so far parsed from the available data, might be partial if an exception happens - let mut metrics_data = PerformMetricsData { - profile_url: &perform_input.profile_url, - provider_url: &perform_input.provider_url, - map_url: &perform_input.map_url, - ..Default::default() - }; - - /// This is like the `?` operator but allows us to run cleanup code within the same borrow scope, - /// so we don't have to clone metrics data around for no reason - macro_rules! try_metrics { - (Send $success: expr) => { - tracing::debug!(perform_metrics = ?metrics_data); - // Cleanup code is this - crate::observability::metrics::log_metric!( - Perform - success = $success, - profile = metrics_data.get_profile().as_ref(), - profile_url = metrics_data.profile_url, - profile_content_hash = metrics_data.profile_content_hash, - provider = metrics_data.get_provider(), - provider_url = metrics_data.provider_url, - provider_content_hash = metrics_data.provider_content_hash, - map_url = metrics_data.map_url, - map_content_hash = metrics_data.map_content_hash - ); - }; - - ($e: expr) => { - match $e { - Ok(v) => v, - Err(err) => { - try_metrics!(Send false); - - return Err(err.into()); - } - } - }; - } - - // first cache documents - try_metrics!(self - .profile_cache - .cache(&perform_input.profile_url, ProfileCacheEntry::from_data)); - try_metrics!(self.provider_cache.cache( - &perform_input.provider_url, - ProviderJsonCacheEntry::from_data - )); - try_metrics!(self.map_cache.cache(&perform_input.map_url, |data| { - // TODO: this is temporary, should be extracted from the map manifest - let file_name = perform_input.map_url.split('/').last().unwrap().to_string(); - - MapCacheEntry::new(data, file_name) - })); - - // process map input and parameters - let map_input = self.host_value_to_map_value(perform_input.map_input); - // TODO: Validate Input - // let mut profile_validator = ProfileValidator::new( - // std::str::from_utf8( - // self.document_cache - // .get(&perform_input.profile_url) - // .unwrap() - // .data - // .as_slice(), - // ) - // .unwrap() - // .to_string(), - // perform_input.usecase.clone(), - // ) - // .context("Failed to initialize profile validator")?; - // if let Err(err) = profile_validator.validate_input(map_input.clone()) { - // tracing::error!("Input validation error: {}", err); - // } - - // Validate parameters values against json schema - self.parameters_validator - .validate(&(&perform_input.map_parameters).into()) - .map_err(|err| { - PerformException::from_json_schema_validation_error( - err, - Some("Parameters".to_string().as_ref()), - ) - })?; - - let mut map_parameters = match perform_input.map_parameters { - HostValue::Object(o) => MapValueObject::from_iter( - o.into_iter() - .map(|(k, v)| (k, self.host_value_to_map_value(v))), - ), - HostValue::None => MapValueObject::new(), - _ => unreachable!("Object or None ensured with JSON Schema validation"), - }; - - // Validate security values against json schema - self.security_validator - .validate(&(&perform_input.map_security).into()) - .map_err(|err| { - PerformException::from_json_schema_validation_error( - err, - Some("Security".to_string().as_ref()), - ) - })?; - - // parse provider json - let ProviderJsonCacheEntry { - provider_json, - content_hash: provider_json_content_hash, - } = self - .provider_cache - .get(&perform_input.provider_url) - .unwrap(); - // TODO: validate provider json with json schema, to verify OneClient will understand it? - - metrics_data.provider_content_hash = Some(provider_json_content_hash); - metrics_data.provider = Some(&provider_json.name); - - // process provider and combine with inputs - let mut provider_parameters = prepare_provider_parameters(provider_json); - provider_parameters.append(&mut map_parameters); - let map_parameters = provider_parameters; - let map_security = try_metrics!(prepare_security_map( - provider_json, - &perform_input.map_security - )); - let map_services = try_metrics!( - prepare_services_map(provider_json, &map_parameters) - ); - - let ProfileCacheEntry { - profile: _, - content_hash: profile_content_hash, - } = self.profile_cache.get(&perform_input.profile_url).unwrap(); - metrics_data.profile_content_hash = Some(profile_content_hash); - - // start interpreting stdlib and then map code - // TODO: should this be here or should we hold an instance of the interpreter in global state - // and clear per-perform data each time it is called? - let mut interpreter = try_metrics!(JsInterpreter::new(MapStdImpl::new( - self.mapstd_config.to_owned() - ))); - // here we allow runtime stdlib replacement for development purposes - // this might be removed in the future - try_metrics!(match std::env::var("ONESDK_REPLACE_MAP_STDLIB").ok() { - None => interpreter.eval_code("map_std.js", Self::MAP_STDLIB_JS), - Some(path) => { - let replacement = - try_metrics!(Fs::read_to_string(&path).map_err(|err| PerformException { - error_code: PerformExceptionErrorCode::ReplacementStdlibError, - message: format!("Failed to load replacement map_std: {}", err), - })); - - interpreter.eval_code(&path, &replacement) - } - }); - - let MapCacheEntry { - map, - content_hash: map_content_hash, - file_name: map_file_name, - } = self.map_cache.get(&perform_input.map_url).unwrap(); - metrics_data.map_content_hash = Some(map_content_hash); - let map_result = { - interpreter.state_mut().set_context( - map_std::map_value!({ - "input": map_input, - "parameters": MapValue::Object(map_parameters), - "services": map_services - }), - Some(map_security), - ); - try_metrics!(interpreter.run(map_file_name, map, &perform_input.usecase)); - - interpreter.state_mut().take_output().unwrap() - }; - - try_metrics!(Send map_result.is_ok()); - - Ok(match map_result { - Ok(result) => Ok(self.map_value_to_host_value(result)), - Err(error) => Err(self.map_value_to_host_value(error)), - }) - } -} diff --git a/core/core/src/sf_core/profile_validator.rs b/core/core/src/sf_core/profile_validator.rs deleted file mode 100644 index f72ba659..00000000 --- a/core/core/src/sf_core/profile_validator.rs +++ /dev/null @@ -1,153 +0,0 @@ -#![allow(dead_code)] // TODO: validator broken? always has been - -use thiserror::Error; - -use interpreter_js::{JsInterpreter, JsInterpreterError}; -use map_std::unstable::MapValue; - -use super::{ - map_std_impl::{MapStdImpl, MapStdImplConfig}, - Fs, -}; - -#[derive(Debug, Error)] -pub enum ProfileValidatorError { - #[error("Error interpreting validator code: {0}")] - InterpreterFailed(#[from] JsInterpreterError), - #[error("Internal validator error: {0}")] - InternalError(String), - #[error("Error parsing profile: {0}")] - ProfileParseFailed(String), - #[error("Input is invalid: {0}")] - InputValidationFailed(String), - #[error("Result is invalid: {0}")] - ResultValidationFailed(String), - #[error("Error is invalid: {0}")] - ErrorValidationFailed(String), -} - -pub struct ProfileValidator { - interpreter: JsInterpreter, - validator_bytecode: Vec, - usecase: String, -} -impl ProfileValidator { - const PROFILE_VALIDATOR_JS: &'static str = include_str!("../../assets/js/profile_validator.js"); - - pub fn new(profile: String, usecase: String) -> Result { - let mut interpreter = JsInterpreter::new(MapStdImpl::new(MapStdImplConfig { - log_http_transactions: false, - log_http_transactions_body_max_size: 0, - user_agent: "".to_string(), - }))?; - - let validator_bytecode = match std::env::var("ONESDK_REPLACE_PROFILE_VALIDATOR").ok() { - None => interpreter.compile_code("profile_validator.js", Self::PROFILE_VALIDATOR_JS), - Some(path) => { - let replacement = Fs::read_to_string(&path) - .expect("Failed to load replacement profile_validator"); - interpreter.compile_code(&path, &replacement) - } - }?; - - let mut me = Self { - interpreter, - validator_bytecode, - usecase, - }; - me.set_profile(profile)?; - - Ok(me) - } - - fn set_profile(&mut self, profile: String) -> Result<(), ProfileValidatorError> { - tracing::trace!("ProfileValidator::set_profile: {}", profile); - self.interpreter.state_mut().set_context( - map_std::map_value!({ - "profile": MapValue::String(profile), - "usecase": MapValue::String(self.usecase.clone()) - }), - None, - ); - - self.interpreter.eval_bytecode(&self.validator_bytecode)?; - match self.interpreter.state_mut().take_output().unwrap() { - Err(err) => Err(ProfileValidatorError::ProfileParseFailed( - err.try_into_string().unwrap(), - )), - Ok(_) => Ok(()), - } - } - - pub fn validate_input(&mut self, input: MapValue) -> Result<(), ProfileValidatorError> { - tracing::trace!("ProfileValidator::validate_input: {:?}", input); - self.interpreter.state_mut().set_context( - map_std::map_value!({ - "input": input, - "usecase": MapValue::String(self.usecase.clone()) - }), - None, - ); - - self.interpreter.eval_bytecode(&self.validator_bytecode)?; - - match self.interpreter.state_mut().take_output().unwrap() { - Err(err) => Err(ProfileValidatorError::InternalError( - err.try_into_string().unwrap(), - )), - Ok(MapValue::String(err)) => Err(ProfileValidatorError::InputValidationFailed(err)), - Ok(MapValue::None) => Ok(()), - _ => unreachable!(), - } - } - - pub fn validate_output( - &mut self, - result: Result, - ) -> Result<(), ProfileValidatorError> { - tracing::trace!("ProfileValidator::validate_output: {:?}", result); - - match result { - Ok(res) => { - let val = map_std::map_value!({ - "result": res, - "usecase": MapValue::String(self.usecase.clone()) - }); - self.interpreter.state_mut().set_context(val, None); - - self.interpreter.eval_bytecode(&self.validator_bytecode)?; - - match self.interpreter.state_mut().take_output().unwrap() { - Err(err) => Err(ProfileValidatorError::InternalError( - err.try_into_string().unwrap(), - )), - Ok(MapValue::String(err)) => { - Err(ProfileValidatorError::ResultValidationFailed(err)) - } - Ok(MapValue::None) => Ok(()), - _ => unreachable!(), - } - } - Err(err) => { - let val = map_std::map_value!({ - "error": err, - "usecase": MapValue::String(self.usecase.clone()) - }); - self.interpreter.state_mut().set_context(val, None); - - self.interpreter.eval_bytecode(&self.validator_bytecode)?; - - match self.interpreter.state_mut().take_output().unwrap() { - Err(err) => Err(ProfileValidatorError::InternalError( - err.try_into_string().unwrap(), - )), - Ok(MapValue::String(err)) => { - Err(ProfileValidatorError::ErrorValidationFailed(err)) - } - Ok(MapValue::None) => Ok(()), - _ => unreachable!(), - } - } - } - } -} diff --git a/core/core_wasm/Cargo.toml b/core/core_wasm/Cargo.toml new file mode 100644 index 00000000..4c9c2461 --- /dev/null +++ b/core/core_wasm/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "oneclient_core_wasm" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[features] +default = ["asyncify"] +asyncify = [] +core_mock = ["oneclient_core/core_mock"] + +[dependencies] +wasm_abi = { path = "../wasm_abi" } +oneclient_core = { path = "../core" } + +tracing = { workspace = true } diff --git a/core/core/src/bindings.rs b/core/core_wasm/src/bindings.rs similarity index 98% rename from core/core/src/bindings.rs rename to core/core_wasm/src/bindings.rs index e955c9ac..2077baf2 100644 --- a/core/core/src/bindings.rs +++ b/core/core_wasm/src/bindings.rs @@ -1,4 +1,4 @@ -use sf_std::abi::{ +use wasm_abi::{ AbiResultRepr, Handle, MessageExchange, MessageExchangeFfiFn, Ptr, Size, StaticMessageExchange, StaticStreamExchange, StreamExchange, StreamExchangeFfiFn, }; @@ -7,6 +7,7 @@ use sf_std::abi::{ // MESSAGES // ////////////// +#[derive(Clone)] pub struct MessageExchangeFfi; impl MessageExchangeFfi { // SAFETY: We choose to trust this FFI. @@ -70,6 +71,7 @@ extern "C" fn __import_message_exchange_retrieve( // STREAMS // ///////////// +#[derive(Clone)] pub struct StreamExchangeFfi; impl StreamExchangeFfi { // SAFETY: We choose to trust this FFI. diff --git a/core/core_wasm/src/lib.rs b/core/core_wasm/src/lib.rs new file mode 100644 index 00000000..8067571e --- /dev/null +++ b/core/core_wasm/src/lib.rs @@ -0,0 +1,117 @@ +use std::sync::Mutex; + +use wasm_abi::{Ptr, Size}; +use oneclient_core::{CoreConfiguration, OneClientCore}; + +mod bindings; +mod observability; + +type Core = OneClientCore; +static GLOBAL_STATE: Mutex> = Mutex::new(None); + +// WASI functions which would be automatically called from `_start`, but we need to explicitly call them since we are a lib. +extern "C" { + fn __wasm_call_ctors(); + fn __wasm_call_dtors(); +} + +#[no_mangle] +#[export_name = "oneclient_core_setup"] +#[cfg_attr(feature = "core_mock", allow(unreachable_code))] +/// Initializes persistent core state. +/// +/// This function must not be called twice without calling teardown in between. +pub extern "C" fn __export_oneclient_core_setup() { + // call ctors first + unsafe { __wasm_call_ctors() }; + + let mut lock = GLOBAL_STATE.lock().unwrap(); + if lock.is_some() { + panic!("Already setup"); + } + + // load config, but don't display the error yet since we haven't initialize logging yet + let (config, config_err) = match CoreConfiguration::from_env() { + Ok(c) => (c, None), + Err(err) => (CoreConfiguration::default(), Some(err)), + }; + + // initialize observability + // SAFETY: setup is only allowed to be called once + unsafe { oneclient_core::observability::init(&config) }; + + // now that we have logging we can start printing stuff + tracing::debug!(target: "@user", "oneclient_core_setup called"); + if let Some(err) = config_err { + tracing::error!( + target: "@user", + "Failed to load core configuration from environment: {}", + err + ); + } + + // here we panic on error because there is nothing to teardown + lock.replace(Core::new(&config, bindings::MessageExchangeFfi, bindings::StreamExchangeFfi).unwrap()); +} + +#[no_mangle] +#[export_name = "oneclient_core_teardown"] +#[cfg_attr(feature = "core_mock", allow(unreachable_code))] +/// Tears down persistent core state. +/// +/// This function must be called exactly once after calling core setup. +pub extern "C" fn __export_oneclient_core_teardown() { + tracing::debug!(target: "@user", "oneclient_core_teardown called"); + + match GLOBAL_STATE.try_lock() { + Err(_) => panic!("Global state lock already locked: perform most likely panicked"), + Ok(lock) if lock.is_none() => panic!("Not setup or already torn down"), + Ok(mut lock) => { + let state = lock.take(); + std::mem::drop(state); // just to be explicit, would be dropped implicitly anyway + + // call dtors last + unsafe { __wasm_call_dtors() }; + } + } +} + +#[no_mangle] +#[export_name = "oneclient_core_perform"] +#[cfg_attr(feature = "core_mock", allow(unreachable_code))] +/// Executes perform. +/// +/// Must be called after [__export_oneclient_core_setup] and before [__export_oneclient_core_teardown]. +/// +/// All information about map to be performed will be retrieved through messages. +pub extern "C" fn __export_oneclient_core_perform() { + let mut lock = GLOBAL_STATE.lock().unwrap(); + let state: &mut Core = lock + .as_mut() + .expect("Global state missing: has oneclient_core_setup been called?"); + + state.perform() +} + +#[cfg(feature = "asyncify")] +#[no_mangle] +#[export_name = "asyncify_alloc_stack"] +pub extern "C" fn __export_oneclient_core_async_init(mut data_ptr: Ptr, stack_size: Size) { + // We allocate Size elements to ensure correct alignment, but size is in bytes. + let len = stack_size / std::mem::size_of::(); + + let mut asyncify_stack = Vec::::new(); + asyncify_stack.reserve_exact(len); + asyncify_stack.resize(len, 0); + // leak the stack so deallocation doesn't happen + let asyncify_stack = asyncify_stack.leak(); + + // part of the data contract is that we write the resulting range to the data struct ourselves + let stack = asyncify_stack.as_mut_ptr_range(); + unsafe { + data_ptr.mut_ptr().write(stack.start as Size); + data_ptr.mut_ptr().offset(1).write(stack.end as Size) + } + + // TODO: if we allocate the stack so that the data structure is at its beginning we get the possibility of having multiple stacks without relying on undocumented compiler behavior +} diff --git a/core/core_wasm/src/observability.rs b/core/core_wasm/src/observability.rs new file mode 100644 index 00000000..df6c4637 --- /dev/null +++ b/core/core_wasm/src/observability.rs @@ -0,0 +1,91 @@ +use std::ops::Deref; + +use wasm_abi::{Ptr, Size}; + +use oneclient_core::observability::{METRICS_BUFFER, DEVELOPER_DUMP_BUFFER, buffer::TracingEventBuffer}; + +#[repr(C)] +pub struct FatPointer { + pub ptr: Ptr, + pub size: Size, +} +impl FatPointer { + pub const fn null() -> Self { + Self { + ptr: Ptr::null(), + size: 0, + } + } +} +static mut BUFFER_RETURN_ARENA: [FatPointer; 2] = [FatPointer::null(), FatPointer::null()]; +unsafe fn clear_return_arena() -> Ptr<[FatPointer; 2]> { + unsafe { + BUFFER_RETURN_ARENA[0].ptr = Ptr::null(); + BUFFER_RETURN_ARENA[0].size = 0; + BUFFER_RETURN_ARENA[1].ptr = Ptr::null(); + BUFFER_RETURN_ARENA[1].size = 0; + + Ptr::from((&BUFFER_RETURN_ARENA) as *const [FatPointer; 2]) + } +} +unsafe fn set_return_arena_from(buffer: &impl TracingEventBuffer) -> Ptr<[FatPointer; 2]> { + let [(ptr1, size1), (ptr2, size2)] = buffer.as_raw_parts(); + + unsafe { + BUFFER_RETURN_ARENA[0].ptr = ptr1.into(); + BUFFER_RETURN_ARENA[0].size = size1; + BUFFER_RETURN_ARENA[1].ptr = ptr2.into(); + BUFFER_RETURN_ARENA[1].size = size2; + + Ptr::from((&BUFFER_RETURN_ARENA) as *const [FatPointer; 2]) + } +} + +#[no_mangle] +#[export_name = "oneclient_core_get_metrics"] +/// Returns two fat pointers to memory where metrics are stored. +/// +/// The first one will point to the head of the buffer up to its end. +/// The second one will point from the beginning buffer up to its tail. The second pointer may be null or have zero length. +/// Each metric is a UTF-8 encoded JSON string and is terminated by a null byte. +pub extern "C" fn __export_oneclient_core_get_metrics() -> Ptr<[FatPointer; 2]> { + tracing::debug!("Getting metrics buffer"); + + unsafe { + match METRICS_BUFFER { + Some(ref b) => set_return_arena_from(b.lock().deref()), + None => clear_return_arena(), + } + } +} + +#[no_mangle] +#[export_name = "oneclient_core_clear_metrics"] +/// Clears the metrics buffer. +/// +/// This should be called after [__export_oneclient_core_get_metrics] is called and the metrics are processed. +pub extern "C" fn __export_oneclient_core_clear_metrics() { + tracing::trace!("Clearing metrics buffer"); + + unsafe { + if let Some(ref buffer) = METRICS_BUFFER { + buffer.lock().clear(); + } + } +} + +#[no_mangle] +#[export_name = "oneclient_core_get_developer_dump"] +/// Returns two fat pointer to memory where the developer dump is stored. +/// +/// The first one will point to the head of the buffer up to its end. +/// The second one will point from the beginning buffer up to its tail. The second pointer may be null or have zero length. +/// Each event is a UTF-8 encoded string and is terminated by a null byte. +pub extern "C" fn __export_oneclient_core_get_developer_dump() -> Ptr<[FatPointer; 2]> { + unsafe { + match DEVELOPER_DUMP_BUFFER { + Some(ref b) => set_return_arena_from(b.lock().deref()), + None => clear_return_arena(), + } + } +} diff --git a/core/host_to_core_std/src/unstable/fs.rs b/core/host_to_core_std/src/unstable/fs.rs index a18d294d..4170c6cc 100644 --- a/core/host_to_core_std/src/unstable/fs.rs +++ b/core/host_to_core_std/src/unstable/fs.rs @@ -2,8 +2,7 @@ use std::io::{self, Read}; use super::{IoStream, IoStreamHandle}; use crate::abi::{ - err_from_wasi_errno, MessageExchange, Size, StaticMessageExchange, StaticStreamExchange, - StreamExchange, + err_from_wasi_errno, MessageExchange, Size, StreamExchange, }; // Initial idea was to use the file-open message to obtain a fd from the host @@ -120,33 +119,28 @@ impl OpenOptions { } } -pub struct FsConvenience( - std::marker::PhantomData<(Me, Se)>, -); -impl FsConvenience { - /// Like [std::fs::read]. - pub fn read(path: &str) -> Result, io::Error> { - let mut file = - OpenOptions::new() - .read(true) - .open_in(path.as_ref(), Me::instance(), Se::instance())?; - - let mut data = Vec::new(); - file.read_to_end(&mut data)?; - - Ok(data) - } +/// Like [std::fs::read]. +pub fn read_in(path: &str, message_exchange: impl MessageExchange, stream_exchange: impl StreamExchange) -> Result, io::Error> { + let mut file = + OpenOptions::new() + .read(true) + .open_in(path.as_ref(), message_exchange, stream_exchange)?; - /// Like [std::fs::read_to_string]. - pub fn read_to_string(path: &str) -> Result { - let mut file = - OpenOptions::new() - .read(true) - .open_in(path.as_ref(), Me::instance(), Se::instance())?; + let mut data = Vec::new(); + file.read_to_end(&mut data)?; - let mut data = String::new(); - file.read_to_string(&mut data)?; + Ok(data) +} - Ok(data) - } +/// Like [std::fs::read_to_string]. +pub fn read_to_string(path: &str, message_exchange: impl MessageExchange, stream_exchange: impl StreamExchange) -> Result { + let mut file = + OpenOptions::new() + .read(true) + .open_in(path.as_ref(), message_exchange, stream_exchange)?; + + let mut data = String::new(); + file.read_to_string(&mut data)?; + + Ok(data) } diff --git a/core/host_to_core_std/src/unstable/http.rs b/core/host_to_core_std/src/unstable/http.rs index e063de71..01b56579 100644 --- a/core/host_to_core_std/src/unstable/http.rs +++ b/core/host_to_core_std/src/unstable/http.rs @@ -94,7 +94,7 @@ impl HttpRequest { } } impl HttpRequest { - fn fetch_in( + pub fn fetch_in( method: &str, url: &str, headers: &HeadersMultiMap, diff --git a/scripts/build-a-core/template.py b/scripts/build-a-core/template.py index 275f2b9b..34770439 100644 --- a/scripts/build-a-core/template.py +++ b/scripts/build-a-core/template.py @@ -16,9 +16,9 @@ def dockerfile_template( cargo_profile = cargo_profile.replace("\n", "\\n").replace('"', '\\"') if wasm_opt_flags != "": - post_process = f"{OPT_BINARYEN}/bin/wasm-opt {wasm_opt_flags} {CARGO_TARGET_DIR}/wasm32-wasi/build-a-core/oneclient_core.wasm --output /opt/build-a-core/core.wasm" + post_process = f"{OPT_BINARYEN}/bin/wasm-opt {wasm_opt_flags} {CARGO_TARGET_DIR}/wasm32-wasi/build-a-core/oneclient_core_wasm.wasm --output /opt/build-a-core/core.wasm" else: - post_process = f"cp {CARGO_TARGET_DIR}/wasm32-wasi/build-a-core/oneclient_core.wasm /opt/build-a-core/core.wasm" + post_process = f"cp {CARGO_TARGET_DIR}/wasm32-wasi/build-a-core/oneclient_core_wasm.wasm /opt/build-a-core/core.wasm" return f""" FROM debian:bookworm as wasi-sdk-builder @@ -97,7 +97,7 @@ def dockerfile_template( ENV CARGO_HOME=/var/cache/cargo ENV CARGO_TARGET_DIR={CARGO_TARGET_DIR} RUN echo "{cargo_profile}" >>Cargo.toml -RUN --mount=type=cache,target=/var/cache/cargo cargo build --target wasm32-wasi --package oneclient_core --profile build-a-core {build_std_flags} +RUN --mount=type=cache,target=/var/cache/cargo cargo build --target wasm32-wasi --package oneclient_core_wasm --profile build-a-core {build_std_flags} COPY --from=binaryen-builder /opt/binaryen/bin {OPT_BINARYEN}/bin COPY --from=binaryen-builder /opt/binaryen/lib {OPT_BINARYEN}/lib