Skip to content

Commit

Permalink
Merge pull request #143 from progval/remote-history-service
Browse files Browse the repository at this point in the history
Query history from history server
  • Loading branch information
spb authored Nov 17, 2024
2 parents 4a1589c + ffd8f83 commit ede3e68
Show file tree
Hide file tree
Showing 44 changed files with 1,083 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sable_history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ built = { version = "0.5", features = [ "git2" ] }
sable_network = { path = "../sable_network" }
sable_server = { path = "../sable_server" }

futures = "0.3"
tokio = { version = "1.14", features = [ "full" ] }
serde = { version = "1", features = [ "derive" ] }
serde_with = "1.11"
Expand Down
1 change: 1 addition & 0 deletions sable_history/diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[print_schema]
file = "src/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]
import_types = ["crate::type::*"]

[migrations_directory]
dir = "migrations"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DROP INDEX messages_by_timestamp;

ALTER TABLE
DROP COLUMN message_type,
DROP COLUMN timestamp;

DROP TYPE "MessageType";
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TYPE "Message_Type" AS ENUM ('privmsg', 'notice');

ALTER TABLE messages
ADD COLUMN message_type "Message_Type" NOT NULL,
ADD COLUMN timestamp TIMESTAMP NOT NULL;

CREATE INDEX messages_by_timestamp ON messages USING BRIN (timestamp, id);
COMMENT ON INDEX messages_by_timestamp IS 'Includes the id in order to be a consistent total order across requests';
3 changes: 3 additions & 0 deletions sable_history/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod pg_history_service;
pub use pg_history_service::PgHistoryService;
mod server;
pub use server::*;

mod models;
mod schema;
mod types;
10 changes: 10 additions & 0 deletions sable_history/src/models/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@ use super::*;
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(belongs_to(Channel, foreign_key = target_channel))]
#[diesel(belongs_to(HistoricUser, foreign_key = source_user))]
#[derive(Debug)]
pub struct Message {
pub id: Uuid,
pub source_user: i32,
pub target_channel: i64,
pub text: String,
pub message_type: crate::types::MessageType,
/// Timestamp of the *update* introducing the message.
///
/// This is usually the same second as the one in [`id`] (a UUIDv7), but is
/// occasionally 1 second later, because the message id is created before being
/// pushed to the log.
/// It can also before significantly different, because both are based on the
/// system clock, which can change arbitrarily.
pub timestamp: chrono::NaiveDateTime,
}
326 changes: 326 additions & 0 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
use std::collections::HashMap;

use anyhow::{bail, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::dsl::sql;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures::stream::{StreamExt, TryStreamExt};
use tokio::sync::Mutex;
use uuid::Uuid;

use sable_network::prelude::*;

use crate::schema::{channels, historic_users, messages};

/// Implementation of [`HistoryService`] backed PostgreSQL
pub struct PgHistoryService<'a> {
database_connection: &'a Mutex<AsyncPgConnection>,
}

impl<'a> PgHistoryService<'a> {
pub fn new(database_connection: &'a Mutex<AsyncPgConnection>) -> Self {
Self {
database_connection,
}
}
}

impl<'a> HistoryService for PgHistoryService<'a> {
async fn list_targets(
&self,
_user: UserId,
_after_ts: Option<i64>,
_before_ts: Option<i64>,
_limit: Option<usize>,
) -> HashMap<TargetId, i64> {
// TODO: access control
// TODO: after_ts, before_ts, limit
match channels::dsl::channels
.select((
channels::dsl::id,
sql::<diesel::pg::sql_types::Uuid>(
"SELECT MAX(id) FROM messages WHERE target_channel=channels.id",
),
))
.load_stream(&mut *self.database_connection.lock().await)
.await
{
Err(e) => {
tracing::error!("Could not get history channels: {e}");
return HashMap::new();
}
Ok(rows) => rows
.map(|row| -> Result<(TargetId, i64)> {
let (channel_id, max_message_id): (i64, Uuid) = row?;
let channel =
TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64)));

let Some(ts) = max_message_id.get_timestamp() else {
bail!("messages.id should be a UUID7, not {max_message_id}");
};
let (seconds, _) = ts.to_unix();
let Ok(seconds) = seconds.try_into() else {
bail!("message {max_message_id}'s UNIX timestamp is negative");
};
Ok((channel, seconds))
})
.try_collect()
.await
.unwrap_or_else(|e| {
tracing::error!("Could not read rows: {e}");
HashMap::new()
}),
}
}

async fn get_entries(
&self,
_user: UserId,
target: TargetId,
request: HistoryRequest,
) -> Result<impl IntoIterator<Item = HistoricalEvent>, HistoryError> {
// TODO: access control
let TargetId::Channel(channel_id) = target else {
// TODO: PMs
return Err(HistoryError::InvalidTarget(target));
};

let mut connection_lock = self.database_connection.lock().await;

let db_channel_id = channel_id.as_u64() as i64;
let channel = match channels::dsl::channels
.find(db_channel_id)
.select(crate::models::Channel::as_select())
.first(&mut *connection_lock)
.await
.optional()
{
Ok(Some(channel)) => channel,
Ok(None) => return Err(HistoryError::InvalidTarget(target)),
Err(e) => {
tracing::error!("Could not check if channel exists: {e}");
return Err(HistoryError::InternalError(
"Could not check if channel exists".to_string(),
));
}
};

let base_query = messages::dsl::messages
.inner_join(historic_users::dsl::historic_users)
.select((
messages::dsl::id,
messages::dsl::timestamp,
messages::dsl::message_type,
messages::dsl::text,
historic_users::dsl::nick,
historic_users::dsl::ident,
historic_users::dsl::vhost,
historic_users::dsl::account_name,
))
.filter(messages::dsl::target_channel.eq(db_channel_id));
match request {
HistoryRequest::Latest { to_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
match to_ts {
Some(to_ts) => {
let to_ts = DateTime::from_timestamp(to_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(to_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
None => {
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
}
}
HistoryRequest::Before { from_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let from_ts = DateTime::from_timestamp(from_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.lt(from_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
HistoryRequest::After { start_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
)
.await
}
HistoryRequest::Around { around_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let around_ts = DateTime::from_timestamp(around_ts, 0)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
collect_query(
connection_lock,
&channel,
false, // don't reverse
CombineDsl::union(
base_query
.filter(messages::dsl::timestamp.le(around_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
base_query
.filter(messages::dsl::timestamp.gt(around_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
),
)
.await
.map(|mut events| {
// TODO: make postgresql sort it, it may be able to do it directly from
// the index scan instead of sorting after the union
events.sort_unstable_by_key(|event| match event {
HistoricalEvent::Message { id, timestamp, .. } => (*timestamp, *id),
});
events
})
}
HistoryRequest::Between {
start_ts,
end_ts,
limit,
} => {
if start_ts <= end_ts {
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
collect_query(
connection_lock,
&channel,
false, // don't reverse
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
.filter(messages::dsl::timestamp.lt(end_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit),
)
.await
} else {
let start_ts = DateTime::from_timestamp(start_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
collect_query(
connection_lock,
&channel,
true, // reverse
base_query
.filter(messages::dsl::timestamp.gt(end_ts))
.filter(messages::dsl::timestamp.lt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit),
)
.await
}
}
}
}
}

type JoinedMessageRow = (
uuid::Uuid,
NaiveDateTime,
crate::types::MessageType,
String,
String,
String,
String,
Option<String>,
);

async fn collect_query<'query>(
mut connection: tokio::sync::MutexGuard<'_, AsyncPgConnection>,
channel: &crate::models::Channel,
reverse: bool,
query: impl diesel_async::RunQueryDsl<AsyncPgConnection>
+ diesel_async::methods::LoadQuery<'query, AsyncPgConnection, JoinedMessageRow>
+ 'query,
) -> Result<Vec<HistoricalEvent>, HistoryError> {
let events = query
.load_stream(&mut *connection)
.await
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(channel, row))
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?;
Ok(if reverse {
events.into_iter().rev().collect()
} else {
events
})
}

fn make_historical_event(
channel: &crate::models::Channel,
(id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): JoinedMessageRow,
) -> HistoricalEvent {
HistoricalEvent::Message {
id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")),
timestamp: timestamp.and_utc().timestamp(),
source: format!("{}!{}@{}", source_nick, source_ident, source_vhost),
source_account,
message_type: message_type.into(),
target: channel.name.clone(), // assume it's the same
text,
}
}
Loading

0 comments on commit ede3e68

Please sign in to comment.