Skip to content

Commit

Permalink
[copy_from] Refactor ranged requests, fix HTTP source (#31174)
Browse files Browse the repository at this point in the history
_Stacked on top of_
#31144

This PR refactors ranged get requests so the logic for forming the
header value is shared for both the HTTP and S3 source. It also fixes a
bug where sometimes `HEAD` requests (which we use to get metadata for a
file) are not supported, so we fallback to a `GET` request but quickly
drop the body.

### Motivation

Fix a bug in ranged HTTP requests I found in the demo today

### Tips for reviewer

review only the final commit, the one titled "start, fix ranged
requests"

### Checklist

- [x] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [x] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [x] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [x] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [x] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.
  • Loading branch information
ParkMyCar authored Jan 30, 2025
1 parent 437ed13 commit 59763d1
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/storage-operators/src/oneshot_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub mod csv;
pub mod aws_source;
pub mod http_source;

mod util;

/// Render a dataflow to do a "oneshot" ingestion.
///
/// Roughly the operators we render do the following:
Expand Down
8 changes: 3 additions & 5 deletions src/storage-operators/src/oneshot_source/aws_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_storage_types::connections::aws::AwsConnection;
use mz_storage_types::connections::ConnectionContext;
use serde::{Deserialize, Serialize};

use crate::oneshot_source::util::IntoRangeHeaderValue;
use crate::oneshot_source::{
OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind,
};
Expand Down Expand Up @@ -197,11 +198,8 @@ impl OneshotSource for AwsS3Source {

let mut request = client.get_object().bucket(&self.bucket).key(&object.name);
if let Some(range) = range {
// See the below link for the specifics of this format.
//
// <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>
let range = format!("byte={}-{}", range.start(), range.end());
request = request.range(range);
let value = range.into_range_header_value();
request = request.range(value);
}

let object = request
Expand Down
44 changes: 39 additions & 5 deletions src/storage-operators/src/oneshot_source/http_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::oneshot_source::util::IntoRangeHeaderValue;
use crate::oneshot_source::{
Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
};
Expand Down Expand Up @@ -74,16 +75,39 @@ impl OneshotSource for HttpOneshotSource {
async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
// TODO(cf3): Support listing files from a directory index.

// Submit a HEAD request so we can discover metadata about the file.
// To get metadata about a file we'll first try issuing a `HEAD` request, which
// canonically is the right thing do.
let response = self
.client
.head(self.origin.clone())
.send()
.await
.context("HEAD request")?;

// Not all servers accept `HEAD` requests though, so we'll fallback to a `GET`
// request and skip fetching the body.
let headers = match response.error_for_status() {
Ok(response) => response.headers().clone(),
Err(err) => {
tracing::warn!(status = ?err.status(), "HEAD request failed");

let response = self
.client
.get(self.origin.clone())
.send()
.await
.context("GET request")?;
let headers = response.headers().clone();

// Immediately drop the response so we don't attempt to fetch the body.
drop(response);

headers
}
};

let get_header = |name: &reqwest::header::HeaderName| {
let header = response.headers().get(name)?;
let header = headers.get(name)?;
match header.to_str() {
Err(e) => {
tracing::warn!("failed to deserialize header '{name}', err: {e}");
Expand Down Expand Up @@ -125,13 +149,23 @@ impl OneshotSource for HttpOneshotSource {
&'s self,
object: Self::Object,
_checksum: Self::Checksum,
_range: Option<std::ops::RangeInclusive<usize>>,
range: Option<std::ops::RangeInclusive<usize>>,
) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
// TODO(cf1): Support the range param.
// TODO(cf1): Validate our checksum.

let initial_response = async move {
let response = self.client.get(object.url).send().await.context("get")?;
let mut request = self.client.get(object.url);

if let Some(range) = &range {
let value = range.into_range_header_value();
request = request.header(&reqwest::header::RANGE, value);
}

// TODO(parkmycar): We should probably assert that the response contains
// an appropriate Content-Range header in the response, and maybe that we
// got back an HTTP 206?

let response = request.send().await.context("get")?;
let bytes_stream = response.bytes_stream().err_into();

Ok::<_, StorageErrorX>(bytes_stream)
Expand Down
32 changes: 32 additions & 0 deletions src/storage-operators/src/oneshot_source/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Utility functions for Oneshot sources.
/// Utility trait for converting various Rust Range types into a header value.
/// according to the MDN Web Docs.
///
/// See: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>
pub trait IntoRangeHeaderValue {
fn into_range_header_value(&self) -> String;
}

impl IntoRangeHeaderValue for std::ops::Range<usize> {
fn into_range_header_value(&self) -> String {
let inclusive_end = std::cmp::max(self.start, self.end.saturating_sub(1));
(self.start..=inclusive_end).into_range_header_value()
}
}

impl IntoRangeHeaderValue for std::ops::RangeInclusive<usize> {
fn into_range_header_value(&self) -> String {
// See <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range>.
format!("bytes={}-{}", self.start(), self.end())
}
}

0 comments on commit 59763d1

Please sign in to comment.