Skip to content

Commit

Permalink
start, fix ranged requests
Browse files Browse the repository at this point in the history
* move the creation of a Range header behind a trait
* use a GET request for the HTTP source if a HEAD request fails
  • Loading branch information
ParkMyCar committed Jan 23, 2025
1 parent 2063392 commit d3ae6b9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/storage-operators/src/oneshot_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub mod csv;
pub mod aws_source;
pub mod http_source;

mod util;

/// Render a dataflow to do a "oneshot" ingestion.
///
/// Roughly the operators we render do the following:
Expand Down
8 changes: 3 additions & 5 deletions src/storage-operators/src/oneshot_source/aws_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_storage_types::connections::aws::AwsConnection;
use mz_storage_types::connections::ConnectionContext;
use serde::{Deserialize, Serialize};

use crate::oneshot_source::util::IntoRangeHeaderValue;
use crate::oneshot_source::{
OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind,
};
Expand Down Expand Up @@ -204,11 +205,8 @@ impl OneshotSource for AwsS3Source {

let mut request = client.get_object().bucket(&self.bucket).key(&object.name);
if let Some(range) = range {
// See the below link for the specifics of this format.
//
// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>
let range = format!("byte={}-{}", range.start(), range.end());
request = request.range(range);
let value = range.into_range_header_value();
request = request.range(value);
}

let object = request
Expand Down
44 changes: 39 additions & 5 deletions src/storage-operators/src/oneshot_source/http_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::oneshot_source::util::IntoRangeHeaderValue;
use crate::oneshot_source::{
Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
};
Expand Down Expand Up @@ -78,16 +79,39 @@ impl OneshotSource for HttpOneshotSource {
async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
// TODO(cf3): Support listing files from a directory index.

// Submit a HEAD request so we can discover metadata about the file.
// To get metadata about a file we'll first try issuing a `HEAD` request, which
// canonically is the right thing do.
let response = self
.client
.head(self.origin.clone())
.send()
.await
.context("HEAD request")?;

// Not all servers accept `HEAD` requests though, so we'll fallback to a `GET`
// request and skip fetching the body.
let headers = match response.error_for_status() {
Ok(response) => response.headers().clone(),
Err(err) => {
tracing::warn!(status = ?err.status(), "HEAD request failed");

let response = self
.client
.get(self.origin.clone())
.send()
.await
.context("GET request")?;
let headers = response.headers().clone();

// Immediately drop the response so we don't attempt to fetch the body.
drop(response);

headers
}
};

let get_header = |name: &reqwest::header::HeaderName| {
let header = response.headers().get(name)?;
let header = headers.get(name)?;
match header.to_str() {
Err(e) => {
tracing::warn!("failed to deserialize header '{name}', err: {e}");
Expand Down Expand Up @@ -129,13 +153,23 @@ impl OneshotSource for HttpOneshotSource {
&'s self,
object: Self::Object,
_checksum: Self::Checksum,
_range: Option<std::ops::RangeInclusive<usize>>,
range: Option<std::ops::RangeInclusive<usize>>,
) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
// TODO(cf1): Support the range param.
// TODO(cf1): Validate our checksum.

let initial_response = async move {
let response = self.client.get(object.url).send().await.context("get")?;
let mut request = self.client.get(object.url);

if let Some(range) = &range {
let value = range.into_range_header_value();
request = request.header(&reqwest::header::RANGE, value);
}

// TODO(parkmycar): We should probably assert that the response contains
// an appropriate Content-Range header in the response, and maybe that we
// got back an HTTP 206?

let response = request.send().await.context("get")?;
let bytes_stream = response.bytes_stream().err_into();

Ok::<_, StorageErrorX>(bytes_stream)
Expand Down
32 changes: 32 additions & 0 deletions src/storage-operators/src/oneshot_source/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Utility functions for Oneshot sources.
/// Utility trait for converting various Rust Range types into a header value.
/// according to the MDN Web Docs.
///
/// See: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>
pub trait IntoRangeHeaderValue {
fn into_range_header_value(&self) -> String;
}

impl IntoRangeHeaderValue for std::ops::Range<usize> {
fn into_range_header_value(&self) -> String {
let inclusive_end = std::cmp::max(self.start, self.end.saturating_sub(1));
(self.start..=inclusive_end).into_range_header_value()
}
}

impl IntoRangeHeaderValue for std::ops::RangeInclusive<usize> {
fn into_range_header_value(&self) -> String {
// See <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>.
format!("bytes={}-{}", self.start(), self.end())
}
}

0 comments on commit d3ae6b9

Please sign in to comment.