Skip to content

Commit

Permalink
Deduplicate query collection
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Nov 9, 2024
1 parent 61f78eb commit a8f711b
Showing 1 changed file with 81 additions and 99 deletions.
180 changes: 81 additions & 99 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,101 +119,73 @@ impl<'a> HistoryService for PgHistoryService<'a> {
historic_users::dsl::account_name,
))
.filter(messages::dsl::target_channel.eq(db_channel_id));
Ok(match request {
match request {
HistoryRequest::Latest { to_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
match to_ts {
Some(to_ts) => {
let to_ts = DateTime::from_timestamp(to_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
Box::new(
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(to_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
.limit(limit),
)
.await
}
None => {
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
None => Box::new(
base_query
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
),
}
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
}
HistoryRequest::Before { from_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let from_ts = DateTime::from_timestamp(from_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
Box::new(
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.lt(from_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
.limit(limit),
)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
}
HistoryRequest::After { start_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
Box::new(
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit)
.load_stream(&mut *connection_lock),
.limit(limit),
)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
}
HistoryRequest::Around { around_ts, limit } => {

Check warning on line 190 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 190 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`

Check warning on line 190 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 190 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`
todo!("around")
Expand All @@ -231,27 +203,18 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Box::new(
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
.filter(messages::dsl::timestamp.lt(end_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit)
.load_stream(&mut *connection_lock),
.limit(limit),
)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
} else {
let start_ts = DateTime::from_timestamp(start_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
Expand All @@ -260,48 +223,67 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Box::new(
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(end_ts))
.filter(messages::dsl::timestamp.lt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
.limit(limit),
)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
}
}
})
}
}
}

type JoinedMessageRow = (
uuid::Uuid,
NaiveDateTime,
crate::types::MessageType,
String,
String,
String,
String,
Option<String>,
);

async fn collect_query<'query>(
mut connection: tokio::sync::MutexGuard<'_, AsyncPgConnection>,
channel: &crate::models::Channel,
reverse: bool,
query: impl diesel_async::RunQueryDsl<AsyncPgConnection>
+ diesel_async::methods::LoadQuery<'query, AsyncPgConnection, JoinedMessageRow>
+ 'query,
) -> Result<Vec<HistoricalEvent>, HistoryError> {
let events = query
.load_stream(&mut *connection)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?;
Ok(if reverse {
events.into_iter().rev().collect()
} else {
events
})
}

fn make_historical_event(
channel: &crate::models::Channel,
(id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): (
uuid::Uuid,
NaiveDateTime,
crate::types::MessageType,
String,
String,
String,
String,
Option<String>,
),
(id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): JoinedMessageRow,
) -> HistoricalEvent {
HistoricalEvent::Message {
id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")),
Expand Down

0 comments on commit a8f711b

Please sign in to comment.