Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make HistoryService methods async #140

Merged
merged 3 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions sable_ircd/src/command/handlers/chathistory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ fn parse_limit(s: &str) -> Result<usize, CommandError> {

#[allow(clippy::too_many_arguments)]
#[command_handler("CHATHISTORY")]
fn handle_chathistory(
async fn handle_chathistory(
ctx: &dyn Command,
source: UserSource,
source: UserSource<'_>,
server: &ClientServer,
response: &dyn CommandResponse,
subcommand: &str,
Expand All @@ -72,7 +72,8 @@ fn handle_chathistory(
Some(min(from_ts, to_ts)),
Some(max(from_ts, to_ts)),
Some(limit),
);
)
.await;
}
normalized_subcommand => {
let target = arg_1;
Expand Down Expand Up @@ -135,8 +136,11 @@ fn handle_chathistory(
}
};

let log = server.node().history();
match log.get_entries(source.id(), target_id, request) {
let history_service = LocalHistoryService::new(server.node());
match history_service
.get_entries(source.id(), target_id, request)
.await
{
Ok(entries) => send_history_entries(server, response, target, entries)?,
Err(HistoryError::InvalidTarget(_)) => Err(invalid_target_error())?,
};
Expand All @@ -148,17 +152,19 @@ fn handle_chathistory(

// For listing targets, we iterate backwards through time; this allows us to just collect the
// first timestamp we see for each target and know that it's the most recent one
fn list_targets(
server: &ClientServer,
into: impl MessageSink,
source: &wrapper::User,
async fn list_targets<'a>(
server: &'a ClientServer,
into: impl MessageSink + 'a,
source: &'a wrapper::User<'_>,
from_ts: Option<i64>,
to_ts: Option<i64>,
limit: Option<usize>,
) {
let log = server.node().history();
let history_service = LocalHistoryService::new(server.node());

let found_targets = log.list_targets(source.id(), to_ts, from_ts, limit);
let found_targets = history_service
.list_targets(source.id(), to_ts, from_ts, limit)
.await;

// The appropriate cap here is Batch - chathistory is enabled because we got here,
// but can be used without batch support.
Expand Down Expand Up @@ -194,7 +200,7 @@ fn send_history_entries<'a>(
server: &ClientServer,
into: impl MessageSink,
target: &str,
entries: impl Iterator<Item = &'a HistoryLogEntry>,
entries: impl IntoIterator<Item = HistoryLogEntry>,
) -> CommandResult {
let batch = into
.batch("chathistory", ClientCapability::Batch)
Expand All @@ -204,7 +210,7 @@ fn send_history_entries<'a>(
for entry in entries {
// Ignore errors here; it's possible that a message has been expired out of network state
// but a reference to it still exists in the history log
let _ = server.send_item(entry, &batch, entry);
let _ = server.send_item(&entry, &batch, &entry);
}

Ok(())
Expand Down
206 changes: 107 additions & 99 deletions sable_network/src/history/local_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,104 @@ fn target_id_for_entry(for_user: UserId, entry: &HistoryLogEntry) -> Option<Targ
}

/// Implementation of [`HistoryService`] backed by [`NetworkNode`]
impl HistoryService for NetworkHistoryLog {
fn list_targets(
pub struct LocalHistoryService<'a> {
node: &'a NetworkNode,
}

impl<'a> LocalHistoryService<'a> {
pub fn new(node: &'a NetworkNode) -> Self {
LocalHistoryService { node }
}

fn get_history_for_target(
&self,
source: UserId,
target: TargetId,
from_ts: Option<i64>,
to_ts: Option<i64>,
backward_limit: usize,
forward_limit: usize,
) -> Result<impl Iterator<Item = HistoryLogEntry>, HistoryError> {
let mut backward_entries = Vec::new();
let mut forward_entries = Vec::new();
let mut target_exists = false;

// Keep the lock on the NetworkHistoryLog between the backward and the forward
// search to make sure both have a consistent state
let log = self.node.history();

if backward_limit != 0 {
let from_ts = if forward_limit == 0 {
from_ts
} else {
// HACK: This is AROUND so we want to capture messages whose timestamp matches exactly
// (it's a message in the middle of the range)
from_ts.map(|from_ts| from_ts + 1)
};

for entry in log.entries_for_user_reverse(source) {
target_exists = true;
if matches!(from_ts, Some(ts) if entry.timestamp >= ts) {
// Skip over until we hit the timestamp window we're interested in
continue;
}
if matches!(to_ts, Some(ts) if entry.timestamp <= ts) {
// If we hit this then we've passed the requested window and should stop
break;
}

if let Some(event_target) = target_id_for_entry(source, entry) {
if event_target == target {
backward_entries.push(entry.clone());
}
}

if backward_limit <= backward_entries.len() {
break;
}
}
}

if forward_limit != 0 {
for entry in log.entries_for_user(source) {
target_exists = true;
if matches!(from_ts, Some(ts) if entry.timestamp <= ts) {
// Skip over until we hit the timestamp window we're interested in
continue;
}
if matches!(to_ts, Some(ts) if entry.timestamp >= ts) {
// If we hit this then we've passed the requested window and should stop
break;
}

if let Some(event_target) = target_id_for_entry(source, entry) {
if event_target == target {
forward_entries.push(entry.clone());
}
}

if forward_limit <= forward_entries.len() {
break;
}
}
}

if target_exists {
// "The order of returned messages within the batch is implementation-defined, but SHOULD be
// ascending time order or some approximation thereof, regardless of the subcommand used."
// -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes
Ok(backward_entries
.into_iter()
.rev()
.chain(forward_entries.into_iter()))
} else {
Err(HistoryError::InvalidTarget(target))
}
}
}

impl<'a> HistoryService for LocalHistoryService<'a> {
async fn list_targets(
&self,
user: UserId,
after_ts: Option<i64>,
Expand All @@ -30,7 +126,7 @@ impl HistoryService for NetworkHistoryLog {
) -> HashMap<TargetId, i64> {
let mut found_targets = HashMap::new();

for entry in self.entries_for_user_reverse(user) {
for entry in self.node.history().entries_for_user_reverse(user) {
if matches!(after_ts, Some(ts) if entry.timestamp >= ts) {
// Skip over until we hit the timestamp window we're interested in
continue;
Expand All @@ -54,16 +150,15 @@ impl HistoryService for NetworkHistoryLog {
found_targets
}

fn get_entries(
async fn get_entries(
&self,
user: UserId,
target: TargetId,
request: HistoryRequest,
) -> Result<impl Iterator<Item = &HistoryLogEntry>, HistoryError> {
) -> Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError> {
match request {
#[rustfmt::skip]
HistoryRequest::Latest { to_ts, limit } => get_history_for_target(
self,
HistoryRequest::Latest { to_ts, limit } => self.get_history_for_target(
user,
target,
None,
Expand All @@ -73,8 +168,7 @@ impl HistoryService for NetworkHistoryLog {
),

HistoryRequest::Before { from_ts, limit } => {
get_history_for_target(
self,
self.get_history_for_target(
user,
target,
Some(from_ts),
Expand All @@ -83,8 +177,7 @@ impl HistoryService for NetworkHistoryLog {
0, // Forward limit
)
}
HistoryRequest::After { start_ts, limit } => get_history_for_target(
self,
HistoryRequest::After { start_ts, limit } => self.get_history_for_target(
user,
target,
Some(start_ts),
Expand All @@ -93,8 +186,7 @@ impl HistoryService for NetworkHistoryLog {
limit,
),
HistoryRequest::Around { around_ts, limit } => {
get_history_for_target(
self,
self.get_history_for_target(
user,
target,
Some(around_ts),
Expand All @@ -109,8 +201,7 @@ impl HistoryService for NetworkHistoryLog {
limit,
} => {
if start_ts <= end_ts {
get_history_for_target(
self,
self.get_history_for_target(
user,
target,
Some(start_ts),
Expand All @@ -121,8 +212,7 @@ impl HistoryService for NetworkHistoryLog {
} else {
// Search backward from start_ts instead of swapping start_ts and end_ts,
// because we want to match the last messages first in case we reach the limit
get_history_for_target(
self,
self.get_history_for_target(
user,
target,
Some(start_ts),
Expand All @@ -135,85 +225,3 @@ impl HistoryService for NetworkHistoryLog {
}
}
}

fn get_history_for_target(
log: &NetworkHistoryLog,
source: UserId,
target: TargetId,
from_ts: Option<i64>,
to_ts: Option<i64>,
backward_limit: usize,
forward_limit: usize,
) -> Result<impl Iterator<Item = &HistoryLogEntry>, HistoryError> {
let mut backward_entries = Vec::new();
let mut forward_entries = Vec::new();
let mut target_exists = false;

if backward_limit != 0 {
let from_ts = if forward_limit == 0 {
from_ts
} else {
// HACK: This is AROUND so we want to capture messages whose timestamp matches exactly
// (it's a message in the middle of the range)
from_ts.map(|from_ts| from_ts + 1)
};

for entry in log.entries_for_user_reverse(source) {
target_exists = true;
if matches!(from_ts, Some(ts) if entry.timestamp >= ts) {
// Skip over until we hit the timestamp window we're interested in
continue;
}
if matches!(to_ts, Some(ts) if entry.timestamp <= ts) {
// If we hit this then we've passed the requested window and should stop
break;
}

if let Some(event_target) = target_id_for_entry(source, entry) {
if event_target == target {
backward_entries.push(entry);
}
}

if backward_limit <= backward_entries.len() {
break;
}
}
}

if forward_limit != 0 {
for entry in log.entries_for_user(source) {
target_exists = true;
if matches!(from_ts, Some(ts) if entry.timestamp <= ts) {
// Skip over until we hit the timestamp window we're interested in
continue;
}
if matches!(to_ts, Some(ts) if entry.timestamp >= ts) {
// If we hit this then we've passed the requested window and should stop
break;
}

if let Some(event_target) = target_id_for_entry(source, entry) {
if event_target == target {
forward_entries.push(entry);
}
}

if forward_limit <= forward_entries.len() {
break;
}
}
}

if target_exists {
// "The order of returned messages within the batch is implementation-defined, but SHOULD be
// ascending time order or some approximation thereof, regardless of the subcommand used."
// -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes
Ok(backward_entries
.into_iter()
.rev()
.chain(forward_entries.into_iter()))
} else {
Err(HistoryError::InvalidTarget(target))
}
}
2 changes: 1 addition & 1 deletion sable_network/src/history/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use concurrent_log::ConcurrentLog;

pub type LogEntryId = usize;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoryLogEntry {
pub id: LogEntryId,
pub timestamp: i64,
Expand Down
1 change: 1 addition & 0 deletions sable_network/src/history/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use service::*;
mod local_service;

use crate::network::NetworkStateChange;
pub use local_service::LocalHistoryService;

/// Implemented by types that provide metadata for a historic state change
pub trait HistoryItem {
Expand Down
7 changes: 5 additions & 2 deletions sable_network/src/history/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! History storage and retrieval

use std::collections::HashMap;
use std::future::Future;

use thiserror::Error;

Expand Down Expand Up @@ -90,12 +91,14 @@ pub trait HistoryService {
after_ts: Option<i64>,
before_ts: Option<i64>,
limit: Option<usize>,
) -> HashMap<TargetId, i64>;
) -> impl Future<Output = HashMap<TargetId, i64>> + Send + Sync;

fn get_entries(
&self,
user: UserId,
target: TargetId,
request: HistoryRequest,
) -> Result<impl Iterator<Item = &HistoryLogEntry>, HistoryError>;
) -> impl Future<Output = Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError>>
+ Send
+ Sync;
}
Loading