Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
format files
Browse files Browse the repository at this point in the history
  • Loading branch information
Makoto-Tomokiyo committed May 2, 2024
1 parent d6da98e commit 5ac4374
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
3 changes: 1 addition & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use tonic::transport::Server;
use scheduler2::composable_database::scheduler_api_server::SchedulerApiServer;
use scheduler2::server::SchedulerService;
use tonic::transport::Server;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = "0.0.0.0:15721".parse().unwrap();

let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_files/");
Expand Down
13 changes: 9 additions & 4 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::composable_database::scheduler_api_client::SchedulerApiClient;
use crate::composable_database::QueryJobStatusArgs;
use crate::composable_database::QueryStatus;
use crate::composable_database::QueryStatus::InProgress;
use crate::intermediate_results::{get_results, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::mock_optimizer::Optimizer;
use crate::parser::ExecutionPlanParser;
Expand All @@ -56,7 +57,6 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::SessionContext;
use serde::{Deserialize, Serialize};
use tonic::Request;
use crate::intermediate_results::{get_results, TaskKey};

#[derive(Clone, Serialize, Deserialize)]
pub struct JobInfo {
Expand Down Expand Up @@ -329,10 +329,15 @@ impl MockFrontend {
// }
// };

let results = get_results(&TaskKey{stage_id: status.stage_id, query_id: status.query_id}).await
.expect("api.rs: query is done but no results in table");
let results = get_results(&TaskKey {
stage_id: status.stage_id,
query_id: status.query_id,
})
.await
.expect("api.rs: query is done but no results in table");

let flattened_results: Vec<RecordBatch> = results.into_iter().flat_map(|r| r.into_iter()).collect();
let flattened_results: Vec<RecordBatch> =
results.into_iter().flat_map(|r| r.into_iter()).collect();

let updated_job_info = JobInfo {
query_id,
Expand Down
2 changes: 1 addition & 1 deletion src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use crate::task::{
TaskStatus::{self, *},
};
use crate::SchedulerError;
use datafusion_proto::bytes::physical_plan_to_bytes;
use std::collections::{BTreeSet, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use datafusion_proto::bytes::physical_plan_to_bytes;
use tokio::sync::{Mutex, Notify};

// Must implement here since generated TaskId does not derive Hash.
Expand Down
15 changes: 7 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ pub struct SchedulerService {

impl fmt::Debug for SchedulerService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SchedulerService {{ queue: {:?} }}",
self.queue,
)
write!(f, "SchedulerService {{ queue: {:?} }}", self.queue,)
}
}

Expand Down Expand Up @@ -99,8 +95,11 @@ impl SchedulerApi for SchedulerService {
// Build a query graph, store in query table, enqueue new tasks.
let qid = self.next_query_id();
let query = QueryGraph::new(qid, plan).await;
self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await;

self.queue
.lock()
.await
.add_query(qid, Arc::new(Mutex::new(query)))
.await;

let response = ScheduleQueryRet { query_id: qid };
Ok(Response::new(response))
Expand Down Expand Up @@ -128,7 +127,7 @@ impl SchedulerApi for SchedulerService {
return Ok(Response::new(QueryJobStatusRet {
query_status: QueryStatus::Done.into(),
stage_id: stage_id,
query_id: query_id
query_id: query_id,
}));
// ****************** END CHANGES FROM INTEGRATION TESTING****************//
}
Expand Down

0 comments on commit 5ac4374

Please sign in to comment.