Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Dec 5, 2024
1 parent 6cd1128 commit fa984b1
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 79 deletions.
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ async fn many_small_changes() -> eyre::Result<()> {

let mut start_id = 0;

FuturesUnordered::from_iter(agents.iter().map(|ta| {
let _: () = FuturesUnordered::from_iter(agents.iter().map(|ta| {
let ta = ta.clone();
start_id += 100000;
async move {
Expand All @@ -1413,7 +1413,7 @@ async fn many_small_changes() -> eyre::Result<()> {
let api_addr = ta.agent.api_addr();
let actor_id = ta.agent.actor_id();

FuturesUnordered::from_iter(durs.into_iter().map(|dur| {
let _: () = FuturesUnordered::from_iter(durs.into_iter().map(|dur| {
let client = client.clone();
start_id += 1;
async move {
Expand Down
5 changes: 2 additions & 3 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ async fn process_sync(

drop(job_tx);

buf.try_collect().await?;
let _: () = buf.try_collect().await?;

debug!("done processing sync state");

Expand Down Expand Up @@ -1228,9 +1228,8 @@ pub async fn parallel_sync(
version,
seqs: new_seqs
.into_iter()
.map(|seqs| {
.inspect(|seqs| {
range.insert(seqs.clone());
seqs
})
.collect(),
}]
Expand Down
87 changes: 43 additions & 44 deletions crates/corro-pg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,7 @@ impl<'a> SqliteNameRef<'a> {
}

#[derive(Clone, Debug)]
#[allow(dead_code)]
enum SqliteName {
Id(Id),
Name(Name),
Expand Down Expand Up @@ -3030,7 +3031,7 @@ fn field_types(
let mut field_type_overrides = HashMap::new();

match parsed_cmd {
ParsedCmd::Sqlite(Cmd::Stmt(stmt)) => match stmt {
ParsedCmd::Sqlite(Cmd::Stmt(
Stmt::Select(Select {
body:
SelectBody {
Expand All @@ -3050,51 +3051,49 @@ fn field_types(
| Stmt::Update {
returning: Some(cols),
..
} => {
for (i, col) in cols.iter().enumerate() {
if let ResultColumn::Expr(expr, _as) = col {
let type_override = match expr {
Expr::Cast { type_name, .. } => Some(name_to_type(&type_name.name)?),
Expr::FunctionCall { name, .. }
| Expr::FunctionCallStar { name, .. } => {
match name.0.as_str().to_uppercase().as_ref() {
"COUNT" => Some(Type::INT8),
_ => None,
}
},
)) => {
for (i, col) in cols.iter().enumerate() {
if let ResultColumn::Expr(expr, _as) = col {
let type_override = match expr {
Expr::Cast { type_name, .. } => Some(name_to_type(&type_name.name)?),
Expr::FunctionCall { name, .. } | Expr::FunctionCallStar { name, .. } => {
match name.0.as_str().to_uppercase().as_ref() {
"COUNT" => Some(Type::INT8),
_ => None,
}
Expr::Literal(lit) => match lit {
Literal::Numeric(s) => Some(if s.contains('.') {
Type::FLOAT8
} else {
Type::INT8
}),
Literal::String(_) => Some(Type::TEXT),
Literal::Blob(_) => Some(Type::BYTEA),
Literal::Keyword(_) => None,
Literal::Null => None,
Literal::CurrentDate => Some(Type::DATE),
Literal::CurrentTime => Some(Type::TIME),
Literal::CurrentTimestamp => Some(Type::TIMESTAMP),
},
_ => None,
};
if let Some(type_override) = type_override {
match prepped.column_name(i) {
Ok(col_name) => {
field_type_overrides.insert(col_name, type_override);
}
Err(e) => {
error!("col index didn't exist at {i}, attempted to override type as: {type_override}: {e}");
}
}
Expr::Literal(lit) => match lit {
Literal::Numeric(s) => Some(if s.contains('.') {
Type::FLOAT8
} else {
Type::INT8
}),
Literal::String(_) => Some(Type::TEXT),
Literal::Blob(_) => Some(Type::BYTEA),
Literal::Keyword(_) => None,
Literal::Null => None,
Literal::CurrentDate => Some(Type::DATE),
Literal::CurrentTime => Some(Type::TIME),
Literal::CurrentTimestamp => Some(Type::TIMESTAMP),
},
_ => None,
};
if let Some(type_override) = type_override {
match prepped.column_name(i) {
Ok(col_name) => {
field_type_overrides.insert(col_name, type_override);
}
Err(e) => {
error!("col index didn't exist at {i}, attempted to override type as: {type_override}: {e}");
}
}
} else {
break;
}
} else {
break;
}
}
_ => {}
},
}
ParsedCmd::Postgres(_stmt) => {
// TODO: handle type overrides here too
// let cols = match stmt {
Expand Down Expand Up @@ -3311,7 +3310,7 @@ mod tests {
println!("t2text: {:?}", row.try_get::<_, String>(2));

let now: DateTime<Utc> = Utc::now();
let now = NaiveDateTime::from_timestamp_micros(now.timestamp_micros()).unwrap();
let now = DateTime::<Utc>::from_timestamp_micros(now.timestamp_micros()).unwrap();
println!("NOW: {now:?}");

let row = client
Expand All @@ -3322,13 +3321,13 @@ mod tests {
.await?;

println!("ROW: {row:?}");
let updated_at = row.try_get::<_, NaiveDateTime>(0)?;
let updated_at = row.try_get::<_, DateTime<Utc>>(0)?;
println!("updated_at: {updated_at:?}");

assert_eq!(now, updated_at);

let future: DateTime<Utc> = Utc::now() + Duration::from_secs(1);
let future = NaiveDateTime::from_timestamp_micros(future.timestamp_micros()).unwrap();
let future = DateTime::<Utc>::from_timestamp_micros(future.timestamp_micros()).unwrap();
println!("NOW: {future:?}");

let row = client
Expand All @@ -3339,7 +3338,7 @@ mod tests {
.await?;

println!("ROW: {row:?}");
let updated_at = row.try_get::<_, NaiveDateTime>(0)?;
let updated_at = row.try_get::<_, DateTime<Utc>>(0)?;
println!("updated_at: {updated_at:?}");

assert_eq!(future, updated_at);
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-tpl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ mod tests {
let f = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&filepath)
.unwrap();

Expand Down Expand Up @@ -788,6 +789,7 @@ mod tests {
let f = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&filepath)
.unwrap();

Expand Down
33 changes: 11 additions & 22 deletions crates/corro-types/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,42 +100,36 @@ impl<T> CorroSender<T> {
self.inner
.send(value)
.await
.map(|r| {
.inspect(|_r| {
self.send_time.record(before.elapsed().as_secs_f64());
self.send_count.increment(1);
r
})
.map_err(|e| {
.inspect_err(|_e| {
self.failed_sends.increment(1);
e
})
}

pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
self.inner
.try_send(value)
.map(|r| {
.inspect(|_r| {
self.send_count.increment(1);
r
})
.map_err(|e| {
.inspect_err(|_e| {
self.failed_sends.increment(1);
e
})
}

pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
let before = Instant::now();
self.inner
.blocking_send(value)
.map(|r| {
.inspect(|_r| {
self.send_time.record(before.elapsed().as_secs_f64());
self.send_count.increment(1);
r
})
.map_err(|e| {
.inspect_err(|_e| {
self.failed_sends.increment(1);
e
})
}

Expand All @@ -148,37 +142,32 @@ impl<T> CorroSender<T> {
self.inner
.send_timeout(value, timeout)
.await
.map(|r| {
.inspect(|_r| {
self.send_time.record(before.elapsed().as_secs_f64());
self.send_count.increment(1);
r
})
.map_err(|e| {
.inspect_err(|_e| {
self.failed_sends.increment(1);
e
})
}
}

impl<T> CorroReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.inner.recv().await.map(|r| {
self.inner.recv().await.inspect(|_r| {
self.recv_count.increment(1);
r
})
}

pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.inner.try_recv().map(|r| {
self.inner.try_recv().inspect(|_r| {
self.recv_count.increment(1);
r
})
}

pub fn blocking_recv(&mut self) -> Option<T> {
self.inner.blocking_recv().map(|r| {
self.inner.blocking_recv().inspect(|_r| {
self.recv_count.increment(1);
r
})
}
}
4 changes: 2 additions & 2 deletions crates/corro-types/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
let pool = pool.clone();
async move {
tokio::spawn(async move {
FuturesUnordered::from_iter((0..per_worker).map(|_| {
let _: () = FuturesUnordered::from_iter((0..per_worker).map(|_| {
let pool = pool.clone();
async move {
let conn = pool.get().await?;
Expand All @@ -250,7 +250,7 @@ mod tests {
}
}));

futs.try_collect().await?;
let _: () = futs.try_collect().await?;

let conn = pool.get().await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncState
for (actor_id, booked) in actors {
let bookedr = booked.read("generate_sync", actor_id.as_simple()).await;

let last_version = match { bookedr.last() } {
let last_version = match bookedr.last() {
None => continue,
Some(v) => v,
};
Expand Down
1 change: 0 additions & 1 deletion crates/corrosion/src/command/consul/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ enum ConsulCheckOp {
Delete { id: String },
}

///
fn update_services(
mut services: HashMap<String, AgentService>,
hashes: &HashMap<String, u64>,
Expand Down
1 change: 1 addition & 0 deletions crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(db_path)?;

info!("Acquiring lock...");
Expand Down
2 changes: 0 additions & 2 deletions crates/sqlite-pool/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use crate::{

/// Configuration object.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde_1::Deserialize, serde_1::Serialize))]
#[cfg_attr(feature = "serde", serde(crate = "serde_1"))]
pub struct Config {
/// Path to SQLite database file.
pub path: PathBuf,
Expand Down
6 changes: 4 additions & 2 deletions crates/sqlite3-restore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub fn restore<P1: AsRef<Path>, P2: AsRef<Path>>(
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(dst.as_ref())?;

let src_meta = src_db_file.metadata()?;
Expand Down Expand Up @@ -92,11 +93,11 @@ pub fn restore<P1: AsRef<Path>, P2: AsRef<Path>>(
if dst_locked.is_wal() {
let dst_wal_path = format!("{}-wal", dst.as_ref().display());
info!("truncating WAL file '{dst_wal_path}'");
let wal_file = std::fs::OpenOptions::new()
std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(PathBuf::from(dst_wal_path))?;
wal_file.set_len(0)?;
}

dst_db_file.seek(std::io::SeekFrom::Start(0))?;
Expand Down Expand Up @@ -178,6 +179,7 @@ pub fn lock_all<P: AsRef<Path>>(
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(PathBuf::from(format!("{}-shm", db_path.as_ref().display())))?;

lock(&shm_file, LockType::Read, DMS, timeout)?;
Expand Down

0 comments on commit fa984b1

Please sign in to comment.