Skip to content

Commit

Permalink
feat: add waverless metric
Browse files Browse the repository at this point in the history
  • Loading branch information
ActivePeter committed Jul 5, 2024
1 parent bcaf1d2 commit 33c4c3c
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 34 deletions.
4 changes: 4 additions & 0 deletions scripts/build/template/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def os_system_sure(command):
os.environ['RUST_LOG'] = 'info,wasm_serverless=debug'
# use crac jdk (See install/inner/install_crac.py)
os.environ['JAVA_HOME'] = CRAC_INSTALL_DIR
# wasmedge
ld_library_path = os.environ.get('LD_LIBRARY_PATH', '')
new_path = '/root/.wasmedge/lib/'
os.environ['LD_LIBRARY_PATH'] = new_path + ':' + ld_library_path

# NODE_ID=$1
NODE_ID = sys.argv[1]
Expand Down
18 changes: 0 additions & 18 deletions scripts/deploy_cluster/node_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,3 @@ nodes:
3:
addr: 192.168.31.96:2500
spec: [meta, worker]
7:
addr: 192.168.31.89:2500
spec: [meta, worker]
8:
addr: 192.168.31.54:2500
spec: [meta, worker]
9:
addr: 192.168.31.9:2500
spec: [meta, worker]
10:
addr: 192.168.31.240:2500
spec: [meta, worker]
11:
addr: 192.168.31.138:2500
spec: [meta, worker]
12:
addr: 192.168.31.171:2500
spec: [meta, worker]
18 changes: 17 additions & 1 deletion src/general/m_appmeta_manager/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::{SystemTime, UNIX_EPOCH};

use axum::extract::{DefaultBodyLimit, Multipart, Path};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
Expand Down Expand Up @@ -50,7 +52,10 @@ async fn call_app_fn(Path((app, func)): Path<(String, String)>, body: String) ->
StatusCode::BAD_REQUEST.into_response()
} else {
// # call instance run

let req_arrive_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let res = view()
.executor()
.handle_http_task(&format!("{app}/{func}"), body)
Expand All @@ -60,6 +65,17 @@ async fn call_app_fn(Path((app, func)): Path<(String, String)>, body: String) ->
// self.request_handler_view.p2p().nodes_config.this.0,
// ))
.await;
// inject `req_arrive_time`
let res = res.map(|v| {
v.map(|v| {
let mut res: serde_json::Value = serde_json::from_str(&v).unwrap();
let _ = res.as_object_mut().unwrap().insert(
"req_arrive_time".to_owned(),
serde_json::Value::from(req_arrive_time),
);
serde_json::to_string(&res).unwrap()
})
});
match res {
Ok(Some(res)) => (StatusCode::OK, res).into_response(),
Ok(None) => StatusCode::OK.into_response(),
Expand Down
15 changes: 8 additions & 7 deletions src/general/m_appmeta_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
},
},
};
use crate::worker::m_executor::Executor;
use crate::{
general::kv_interface::KvOps,
logical_module_view_impl,
Expand All @@ -28,9 +29,6 @@ use crate::{
util::{self, JoinHandleWrapper},
worker::func::m_instance_manager::InstanceManager,
};
use crate::{
worker::m_executor::Executor,
};
use async_trait::async_trait;
use axum::body::Bytes;
use enum_as_inner::EnumAsInner;
Expand Down Expand Up @@ -742,11 +740,14 @@ impl AppMetaManager {
let tmpappdir2 = tmpappdir.clone();
// remove old dir&app
if let Some(_) = self.meta.write().await.app_metas.remove(&tmpapp) {
let ins = self.view.instance_manager().app_instances.remove(&tmpapp);
if let Some(ins) = ins {
ins.value().kill().await;
}
tracing::debug!("remove old app meta {}", tmpapp);
}
let ins = self.view.instance_manager().app_instances.remove(&tmpapp);
if let Some(ins) = ins {
ins.value().kill().await;
tracing::debug!("remove old app instance {}", tmpapp);
}

if tmpappdir2.exists() {
// remove old app
fs::remove_dir_all(&tmpappdir2).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/general/m_os/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl OperatingSystem {
}
OsProcessType::JavaCheckpoints(app) => {
let appdir = self.view.appmeta_manager().fs_layer.concat_app_dir(&app);
// create checkpoint-dir
let _ = std::fs::create_dir(appdir.join("checkpoint-dir"));

// 打开或创建日志文件
let log_file_path = appdir.join("checkpoint.log");
// 打开或创建日志文件
Expand Down
14 changes: 10 additions & 4 deletions src/general/network/m_p2p_quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ impl LogicalModule for P2PQuicNode {
addr,
e
);
tokio::time::sleep(Duration::from_secs(10)).await;

}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
})
.into(),
Expand All @@ -190,12 +191,15 @@ impl LogicalModule for P2PQuicNode {
Ok(msg) => {
if let Some(WireMsg((_head, _, bytes))) = msg {
let addr=String::from_utf8(bytes.to_vec()).unwrap().parse::<SocketAddr>().unwrap();
tracing::info!("recv connect from {}", addr);

// tracing::info!("recv connect from {}", addr);
if view.p2p().find_peer_id(&addr).is_none(){
// tracing::warn!("recv connect from unknown peer {}", addr);
continue;
}
// handle_conflict_connection(&view,&shared, &endpoint, connection, incoming);
new_handle_connection_task(addr,&view, shared.clone(), endpoint.clone(), connection, incoming);
}else{
tracing::info!("didn't recv head");
tracing::warn!("didn't recv head");
continue;
}
}
Expand Down Expand Up @@ -313,6 +317,8 @@ fn new_handle_connection_task(
}));
}


/// remote_addr should be checked
async fn handle_connection(
remote_addr: SocketAddr,
view: &View,
Expand Down
2 changes: 2 additions & 0 deletions src/master/m_http_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ impl HttpHandler for MasterHttpHandler {

#[cfg(test)]
mod tests {
use crate::config::NodeConfig;

use super::*;

// test construct_target_path
Expand Down
5 changes: 4 additions & 1 deletion src/master/m_master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ impl Master {
}
}
fn select_node(&self) -> NodeID {
2
let workers = self.view.p2p().nodes_config.get_worker_nodes();
let mut rng = rand::thread_rng();
let idx = rng.gen_range(0..workers.len());
workers.iter().nth(idx).unwrap().clone()
}
}
3 changes: 3 additions & 0 deletions src/result.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::os::unix::net::SocketAddr;

use async_raft::{InitializeError, RaftError};
use camelpaste::paste;
use prost::{DecodeError, Message};
Expand Down Expand Up @@ -40,6 +42,7 @@ pub enum WsRpcErr {
ConnectionNotEstablished(HashValue),
RPCTimout(HashValue),
InvalidMsgData { msg: Box<dyn Message> },
UnknownPeer { peer: SocketAddr },
}

#[derive(Debug)]
Expand Down
8 changes: 5 additions & 3 deletions src/worker/func/shared/java.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::from_utf8};
use std::str::from_utf8;

use tokio::process::{self, Command};

Expand Down Expand Up @@ -50,8 +50,10 @@ pub(super) async fn find_pid(app: &str) -> WSResult<PID> {
}

pub(super) async fn take_snapshot(app: &str, os: &OperatingSystem) {
let _ = os
let res = os
.start_process(OsProcessType::JavaCheckpoints(app.to_owned()))
.wait()
.await;
.await
.unwrap();
assert!(res.success());
}
26 changes: 26 additions & 0 deletions src/worker/m_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use std::{
ptr::NonNull,
sync::atomic::{AtomicU32, AtomicUsize},
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::oneshot;
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -304,8 +305,33 @@ impl Executor {
);
// TODO: input value should be passed from context, like http request or prev trigger

let bf_exec_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let res = instance.execute(&mut fn_ctx).await;

// let return_to_agent_time = SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .expect("Time went backwards")
// .as_millis() as u64;

let res = res.map(|v| {
v.map(|v| {
let mut res: serde_json::Value = serde_json::from_str(&v).unwrap();
let _ = res.as_object_mut().unwrap().insert(
"bf_exec_time".to_owned(),
serde_json::Value::from(bf_exec_time),
);
// let _ = res.as_object_mut().unwrap().insert(
// "return_to_agent_time".to_owned(),
// serde_json::Value::from(return_to_agent_time),
// );
serde_json::to_string(&res).unwrap()
})
});

let _ = self
.view
.instance_manager()
Expand Down

0 comments on commit 33c4c3c

Please sign in to comment.