Skip to content

Commit

Permalink
fetch time range
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk committed Feb 19, 2025
1 parent 45694cb commit b21b95d
Show file tree
Hide file tree
Showing 16 changed files with 344 additions and 41 deletions.
26 changes: 26 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ impl DataSource {
/// Will do minimal checks (e.g. that the file exists), for synchronous errors,
/// but the loading is done in a background task.
///
/// `on_cmd` is used to respond to UI commands.
///
/// `on_msg` can be used to wake up the UI thread on Wasm.
pub fn stream(
self,
on_cmd: Box<dyn Fn(DataSourceCommand) + Send + Sync>,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> anyhow::Result<StreamSource> {
re_tracing::profile_function!();
Expand Down Expand Up @@ -259,10 +262,23 @@ impl DataSource {

let address = url.as_str().try_into()?;

let on_cmd = Box::new(move |cmd: re_grpc_client::redap::Command| match cmd {
redap::Command::SetLoopSelection {
recording_id,
timeline,
time_range,
} => on_cmd(DataSourceCommand::SetLoopSelection {
recording_id,
timeline,
time_range,
}),
});

match address {
RedapAddress::Recording {
origin,
recording_id,
time_range,
} => {
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::RerunGrpcStream {
Expand All @@ -277,6 +293,8 @@ impl DataSource {
tx,
origin,
recording_id,
time_range,
on_cmd,
on_msg,
)
.await
Expand All @@ -300,6 +318,14 @@ impl DataSource {
}
}

pub enum DataSourceCommand {
SetLoopSelection {
recording_id: String,
timeline: re_log_types::Timeline,
time_range: re_log_types::ResolvedTimeRange,
},
}

#[cfg(not(target_arch = "wasm32"))]
#[test]
fn test_data_source_from_uri() {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod data_source;
#[cfg(not(target_arch = "wasm32"))]
mod load_stdin;

pub use self::data_source::{DataSource, StreamSource};
pub use self::data_source::{DataSource, DataSourceCommand, StreamSource};

// ----------------------------------------------------------------------------

Expand Down
121 changes: 120 additions & 1 deletion crates/store/re_grpc_client/src/redap/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum ConnectionError {
#[error(transparent)]
InvalidScheme(#[from] InvalidScheme),

#[error(transparent)]
InvalidTimeRange(#[from] InvalidTimeRange),

#[error("unexpected endpoint: {0}")]
UnexpectedEndpoint(String),

Expand All @@ -36,6 +39,76 @@ pub enum ConnectionError {
CannotLoadUrlAsRecording { url: String },
}

#[derive(thiserror::Error, Debug)]
#[error("invalid time range (expected `?time_range=timeline@int_seconds..int_seconds`)")]
pub struct InvalidTimeRange;

#[derive(Debug, PartialEq, Eq)]
pub struct TimeRange {
pub timeline: re_log_types::Timeline,
pub time_range: re_log_types::ResolvedTimeRange,
}

impl std::fmt::Display for TimeRange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
timeline,
time_range,
} = self;
write!(
f,
"{}@{}..{}",
timeline.name(),
time_range.min().as_f64(),
time_range.max().as_f64(),
)
}
}

impl TryFrom<&str> for TimeRange {
type Error = InvalidTimeRange;

fn try_from(value: &str) -> Result<Self, Self::Error> {
let (timeline, time_range) = value.split_once('@').ok_or(InvalidTimeRange)?;

// TODO: don't assume type

let timeline = match timeline {
"log_time" => re_log_types::Timeline::new_temporal(timeline),
"log_tick" => re_log_types::Timeline::new_sequence(timeline),
"frame" => re_log_types::Timeline::new_sequence(timeline),
"frame_nr" => re_log_types::Timeline::new_sequence(timeline),
_ => re_log_types::Timeline::new_temporal(timeline),
};

let time_range = {
let (min, max) = time_range.split_once("..").ok_or(InvalidTimeRange)?;

re_log_types::ResolvedTimeRange::new(
re_log_types::TimeInt::from_seconds(
min.parse::<i64>()
.map_err(|_| InvalidTimeRange)?
.try_into()
.ok()
.unwrap_or_default(),
),
re_log_types::TimeInt::from_seconds(
max.parse::<i64>()
.map_err(|_| InvalidTimeRange)?
.try_into()
.ok()
.unwrap_or_default(),
),
)
};

Ok(Self {
timeline,
time_range,
})
}
}

#[derive(thiserror::Error, Debug)]
#[error("invalid or missing scheme (expected `rerun(+http|+https)://`)")]
pub struct InvalidScheme;
Expand Down Expand Up @@ -218,6 +291,7 @@ pub enum RedapAddress {
Recording {
origin: Origin,
recording_id: String,
time_range: Option<TimeRange>,
},
Catalog {
origin: Origin,
Expand All @@ -230,7 +304,14 @@ impl std::fmt::Display for RedapAddress {
Self::Recording {
origin,
recording_id,
} => write!(f, "{origin}/recording/{recording_id}",),
time_range,
} => match time_range {
None => write!(f, "{origin}/recording/{recording_id}"),
Some(time_range) => write!(
f,
"{origin}/recording/{recording_id}?time_range={time_range}"
),
},
Self::Catalog { origin } => write!(f, "{origin}/catalog",),
}
}
Expand All @@ -250,10 +331,19 @@ impl TryFrom<&str> for RedapAddress {
.take(2)
.collect::<Vec<_>>();

let time_range = canonical_url
.query_pairs()
.find(|(key, _)| key == "time_range")
.map(|(_, value)| TimeRange::try_from(value.as_ref()));

match segments.as_slice() {
["recording", recording_id] => Ok(Self::Recording {
origin,
recording_id: (*recording_id).to_owned(),
time_range: match time_range {
Some(time_range) => Some(time_range?),
None => None,
},
}),
["catalog" | ""] | [] => Ok(Self::Catalog { origin }),
[unknown, ..] => Err(ConnectionError::UnexpectedEndpoint(format!("{unknown}/"))),
Expand Down Expand Up @@ -306,6 +396,28 @@ mod tests {
let RedapAddress::Recording {
origin,
recording_id,
time_range,
} = address
else {
panic!("Expected recording");
};

assert_eq!(origin.scheme, Scheme::Rerun);
assert_eq!(origin.host, url::Host::<String>::Ipv4(Ipv4Addr::LOCALHOST));
assert_eq!(origin.port, 1234);
assert_eq!(recording_id, "12345");
assert_eq!(time_range, None);
}

#[test]
fn test_recording_url_time_range_to_address() {
let url = "rerun://127.0.0.1:1234/recording/[email protected]";
let address: RedapAddress = url.try_into().unwrap();

let RedapAddress::Recording {
origin,
recording_id,
time_range,
} = address
else {
panic!("Expected recording");
Expand All @@ -315,6 +427,13 @@ mod tests {
assert_eq!(origin.host, url::Host::<String>::Ipv4(Ipv4Addr::LOCALHOST));
assert_eq!(origin.port, 1234);
assert_eq!(recording_id, "12345");
assert_eq!(
time_range,
Some(TimeRange {
timeline: re_log_types::Timeline::new_temporal("timeline"),
time_range: re_log_types::ResolvedTimeRange::new(10, 20)
})
);
}

#[test]
Expand Down
Loading

0 comments on commit b21b95d

Please sign in to comment.