Skip to content

Commit

Permalink
[WIP] Query history from history server
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Oct 26, 2024
1 parent c9341c6 commit 8c1f8ed
Show file tree
Hide file tree
Showing 35 changed files with 529 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion sable_history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ built = { version = "0.5", features = [ "git2" ] }
sable_network = { path = "../sable_network" }
sable_server = { path = "../sable_server" }

futures = "0.3"
tokio = { version = "1.14", features = [ "full" ] }
serde = { version = "1", features = [ "derive" ] }
serde_with = "1.11"
Expand All @@ -25,4 +26,4 @@ chrono = "0.4"
uuid = { version = "1.9.1", features = ["v7", "fast-rng", "serde"] }

diesel = { version = "2.2", features = [ "postgres", "chrono", "uuid" ] }
diesel-async = { version = "0.5", features = [ "postgres" ] }
diesel-async = { version = "0.5", features = [ "postgres" ] }
2 changes: 2 additions & 0 deletions sable_history/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod pg_history_service;
pub use pg_history_service::PgHistoryService;
mod server;
pub use server::*;

Expand Down
85 changes: 85 additions & 0 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::borrow::BorrowMut;

Check warning on line 1 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused import: `std::borrow::BorrowMut`
use std::collections::HashMap;

use diesel::dsl::sql;
use diesel::prelude::*;
use diesel_async::{AsyncConnection, AsyncPgConnection};

Check warning on line 6 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused import: `AsyncConnection`
use futures::stream::StreamExt;
use tokio::sync::Mutex;
use uuid::Uuid;

use sable_network::prelude::*;

use crate::schema::channels;

/// Implementation of [`HistoryService`] backed PostgreSQL
pub struct PgHistoryService<'a> {
database_connection: &'a Mutex<AsyncPgConnection>,
}

impl<'a> PgHistoryService<'a> {
pub fn new(database_connection: &'a Mutex<AsyncPgConnection>) -> Self {
Self {
database_connection,
}
}
}

impl<'a> HistoryService for PgHistoryService<'a> {
async fn list_targets(

Check failure on line 29 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

future cannot be shared between threads safely

Check failure on line 29 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

future cannot be shared between threads safely
&self,
user: UserId,

Check warning on line 31 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `user`
after_ts: Option<i64>,

Check warning on line 32 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `after_ts`
before_ts: Option<i64>,

Check warning on line 33 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `before_ts`
limit: Option<usize>,

Check warning on line 34 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`
) -> HashMap<TargetId, i64> {
// TODO: access control
match diesel_async::RunQueryDsl::load_stream(
channels::dsl::channels.select((
channels::dsl::id,
sql::<diesel::pg::sql_types::Uuid>(
"SELECT MAX(id) FROM messages WHERE target_channel=channels.id",
),
)),
&mut *self.database_connection.lock().await,
)
.await
{
Err(e) => {
tracing::error!("Could not get history channels: {e}");
return HashMap::new();
}
Ok(rows) => {
rows.map(|row| row.expect("Could not deserialize row"))
.map(
|(channel_id, max_message_id): (i64, Uuid)| -> (TargetId, i64) {
let (seconds, _) = max_message_id
.get_timestamp()
.expect("messages.id is not a UUID7")
.to_unix();
(
TargetId::Channel(ChannelId::from(Snowflake::from(
u64::try_from(channel_id).expect("channel id is negative"),
))),
seconds
.try_into()
.expect("message's UNIX timestamp is negative"),
)
},
)
.collect()
.await
}
}
}

async fn get_entries(
&self,
user: UserId,

Check warning on line 78 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `user`
target: TargetId,

Check warning on line 79 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `target`
request: HistoryRequest,
) -> Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError> {
todo!();
Ok(vec![])

Check warning on line 83 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unreachable expression
}
}
64 changes: 60 additions & 4 deletions sable_history/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use sable_network::prelude::*;
use sable_server::ServerType;
use serde::Deserialize;
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
use tracing::instrument;

use std::sync::Arc;

use diesel_async::{AsyncConnection, AsyncPgConnection};

mod sync;
mod update_handler;

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -20,7 +22,7 @@ pub struct HistoryServerConfig {
pub struct HistoryServer {
node: Arc<NetworkNode>,
history_receiver: Mutex<UnboundedReceiver<sable_network::rpc::NetworkHistoryUpdate>>,
database_connection: Mutex<AsyncPgConnection>,
database_connection: Mutex<AsyncPgConnection>, // TODO: use a connection pool
}

impl ServerType for HistoryServer {
Expand Down Expand Up @@ -69,6 +71,14 @@ impl ServerType for HistoryServer {
{
let Some(update) = update else { break; };

if let NetworkStateChange::NewServer(new_server) = &update.change
{
if new_server.server == self.node.id()
{
self.burst_to_network().await;
}
}

if let Err(error) = self.handle_history_update(update).await {
tracing::error!(?error, "Error return handling history update");
}
Expand All @@ -94,10 +104,56 @@ impl ServerType for HistoryServer {
unimplemented!("history servers can't hot-upgrade");
}

fn handle_remote_command(
#[instrument(skip_all)]
async fn handle_remote_command(
&self,
_request: sable_network::rpc::RemoteServerRequestType,
req: sable_network::rpc::RemoteServerRequestType,
) -> sable_network::rpc::RemoteServerResponse {
todo!()
tracing::debug!(?req, "Got remote request");

use crate::server::rpc::RemoteServerRequestType::*;
use sable_network::rpc::RemoteServerResponse;

match req {
History(req) => {
use crate::server::rpc::RemoteHistoryServerRequestType::*;
use crate::server::rpc::RemoteHistoryServerResponse::*;

let history_service = crate::PgHistoryService::new(&self.database_connection);
match req {
ListTargets {
user,
after_ts,
before_ts,
limit,
} => TargetList(
history_service
.list_targets(user, after_ts, before_ts, limit)
.await
.into_iter()
.collect(),
),
GetEntries {
user,
target,
request,
} => Entries(
history_service
.get_entries(user, target, request)
.await
.map(|entries| entries.into_iter().collect()),
),
}
.into()
}
Services(_) => {
tracing::warn!(?req, "Got unsupported request (services)");
RemoteServerResponse::NotSupported
}
Ping => {
tracing::warn!(?req, "Got unsupported request (ping)");
RemoteServerResponse::NotSupported
}
}
}
}
12 changes: 12 additions & 0 deletions sable_history/src/server/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use sable_network::network::wrapper::ObjectWrapper;

Check warning on line 1 in sable_history/src/server/sync.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused import: `sable_network::network::wrapper::ObjectWrapper`

use super::*;
use crate::server::event::IntroduceHistoryServer;

impl HistoryServer {
pub(super) async fn burst_to_network(&self) {
// Set ourselves as the active history node
self.node
.submit_event(self.node.id(), IntroduceHistoryServer {});
}
}
1 change: 1 addition & 0 deletions sable_history/src/server/update_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl HistoryServer {
| NetworkStateChange::ServerQuit(_)
| NetworkStateChange::NewAuditLogEntry(_)
| NetworkStateChange::UserLoginChange(_)
| NetworkStateChange::HistoryServerUpdate(_)
| NetworkStateChange::ServicesUpdate(_)
| NetworkStateChange::EventComplete(_) => Ok(()),
}
Expand Down
1 change: 1 addition & 0 deletions sable_ircd/src/capability/account_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fn account_for_tag(update: &NetworkStateChange, net: &Network) -> Option<String>
NetworkStateChange::NewServer(_) => None,
NetworkStateChange::ServerQuit(_) => None,
NetworkStateChange::NewAuditLogEntry(_) => None,
NetworkStateChange::HistoryServerUpdate(_) => None,
NetworkStateChange::ServicesUpdate(_) => None,
NetworkStateChange::EventComplete(_) => None,
}?;
Expand Down
4 changes: 2 additions & 2 deletions sable_ircd/src/command/handlers/chathistory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn handle_chathistory(
}
};

let history_service = LocalHistoryService::new(server.node());
let history_service = server.node().history_service();
match history_service
.get_entries(source.id(), target_id, request)
.await
Expand All @@ -160,7 +160,7 @@ async fn list_targets<'a>(
to_ts: Option<i64>,
limit: Option<usize>,
) {
let history_service = LocalHistoryService::new(server.node());
let history_service = server.node().history_service();

let found_targets = history_service
.list_targets(source.id(), to_ts, from_ts, limit)
Expand Down
2 changes: 1 addition & 1 deletion sable_ircd/src/command/handlers/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn do_register_user(
_email: &str,
password: &str,
) -> CommandResult {
let Some(services_name) = network.current_services_name() else {
let Some(services_name) = network.current_services_server_name() else {
response_to.send(message::Fail::new(
"REGISTER",
"TEMPORARILY_UNAVAILABLE",
Expand Down
2 changes: 1 addition & 1 deletion sable_ircd/src/command/plumbing/argument_wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a> AmbientArgument<'a> for ServicesTarget<'a> {
Ok(Self {
name: ctx
.network()
.current_services_name()
.current_services_server_name()
.ok_or(CommandError::ServicesNotAvailable)?,
server: ctx.server(),
})
Expand Down
2 changes: 2 additions & 0 deletions sable_ircd/src/messages/send_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl SendHistoryItem<NetworkHistoryUpdate> for ClientServer {
| NetworkStateChange::ServerQuit(_)
| NetworkStateChange::NewAuditLogEntry(_)
| NetworkStateChange::UserLoginChange(_)
| NetworkStateChange::HistoryServerUpdate(_)
| NetworkStateChange::ServicesUpdate(_)
| NetworkStateChange::EventComplete(_) => Ok(()),
}
Expand Down Expand Up @@ -86,6 +87,7 @@ impl SendHistoryItem<HistoryLogEntry> for ClientServer {
| NetworkStateChange::ServerQuit(_)
| NetworkStateChange::NewAuditLogEntry(_)
| NetworkStateChange::UserLoginChange(_)
| NetworkStateChange::HistoryServerUpdate(_)
| NetworkStateChange::ServicesUpdate(_)
| NetworkStateChange::EventComplete(_) => Ok(()),
}
Expand Down
9 changes: 6 additions & 3 deletions sable_ircd/src/server/server_type.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::*;
use crate::connection_collection::ConnectionCollectionState;
use anyhow::Context;
use tracing::instrument;

use client_listener::SavedListenerCollection;
use sable_server::ServerSaveError;

use super::*;
use crate::connection_collection::ConnectionCollectionState;
use crate::monitor::MonitorSet;

/// Saved state of a [`ClientServer`] for later resumption
Expand Down Expand Up @@ -168,7 +170,8 @@ impl sable_server::ServerType for ClientServer {
}
}

fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse {
#[instrument(skip_all)]
async fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse {
match cmd {
RemoteServerRequestType::Ping => RemoteServerResponse::Success,
_ => RemoteServerResponse::NotSupported,
Expand Down
3 changes: 2 additions & 1 deletion sable_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pretty_assertions = "1.4"
[dependencies]
sable_macros = { path = "../sable_macros" }

async-trait = "0.1.83"
tracing = "0.1"
thiserror = "1"
serde_json = "1"
Expand All @@ -42,7 +43,7 @@ rand = "0.8"
arrayvec = { version = "0.7", features = [ "serde" ] }
hashers = "1"
serde_with = "1.11"
parking_lot = { version = "0.12", features = [ "serde" ] }
parking_lot = { version = "0.12.2", features = [ "serde", "arc_lock" ] }
wildmatch = "2.1"
concurrent_log = { version = "0.2.4", features = [ "serde" ] }
ipnet = { version = "2", features = [ "serde" ] }
Expand Down
Loading

0 comments on commit 8c1f8ed

Please sign in to comment.