From d3ae6b9de85e35b41f86e3d02b8d9793b50ad170 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Thu, 23 Jan 2025 18:41:24 -0500 Subject: [PATCH] start, fix ranged requests * move the creation of a Range header behind a trait * use a GET request for the HTTP source if a HEAD request fails --- src/storage-operators/src/oneshot_source.rs | 2 + .../src/oneshot_source/aws_source.rs | 8 ++-- .../src/oneshot_source/http_source.rs | 44 ++++++++++++++++--- .../src/oneshot_source/util.rs | 32 ++++++++++++++ 4 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 src/storage-operators/src/oneshot_source/util.rs diff --git a/src/storage-operators/src/oneshot_source.rs b/src/storage-operators/src/oneshot_source.rs index 62bfc2aedf961..723a5a754e636 100644 --- a/src/storage-operators/src/oneshot_source.rs +++ b/src/storage-operators/src/oneshot_source.rs @@ -105,6 +105,8 @@ pub mod csv; pub mod aws_source; pub mod http_source; +mod util; + /// Render a dataflow to do a "oneshot" ingestion. /// /// Roughly the operators we render do the following: diff --git a/src/storage-operators/src/oneshot_source/aws_source.rs b/src/storage-operators/src/oneshot_source/aws_source.rs index e259dae25c880..f57c33c88fd05 100644 --- a/src/storage-operators/src/oneshot_source/aws_source.rs +++ b/src/storage-operators/src/oneshot_source/aws_source.rs @@ -22,6 +22,7 @@ use mz_storage_types::connections::aws::AwsConnection; use mz_storage_types::connections::ConnectionContext; use serde::{Deserialize, Serialize}; +use crate::oneshot_source::util::IntoRangeHeaderValue; use crate::oneshot_source::{ OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind, }; @@ -204,11 +205,8 @@ impl OneshotSource for AwsS3Source { let mut request = client.get_object().bucket(&self.bucket).key(&object.name); if let Some(range) = range { - // See the below link for the specifics of this format. - // - // - let range = format!("byte={}-{}", range.start(), range.end()); - request = request.range(range); + let value = range.into_range_header_value(); + request = request.range(value); } let object = request diff --git a/src/storage-operators/src/oneshot_source/http_source.rs b/src/storage-operators/src/oneshot_source/http_source.rs index 99896376009be..f14e4011587f0 100644 --- a/src/storage-operators/src/oneshot_source/http_source.rs +++ b/src/storage-operators/src/oneshot_source/http_source.rs @@ -17,6 +17,7 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use url::Url; +use crate::oneshot_source::util::IntoRangeHeaderValue; use crate::oneshot_source::{ Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, }; @@ -78,7 +79,8 @@ impl OneshotSource for HttpOneshotSource { async fn list<'a>(&'a self) -> Result, StorageErrorX> { // TODO(cf3): Support listing files from a directory index. - // Submit a HEAD request so we can discover metadata about the file. + // To get metadata about a file we'll first try issuing a `HEAD` request, which + // canonically is the right thing do. let response = self .client .head(self.origin.clone()) @@ -86,8 +88,30 @@ impl OneshotSource for HttpOneshotSource { .await .context("HEAD request")?; + // Not all servers accept `HEAD` requests though, so we'll fallback to a `GET` + // request and skip fetching the body. + let headers = match response.error_for_status() { + Ok(response) => response.headers().clone(), + Err(err) => { + tracing::warn!(status = ?err.status(), "HEAD request failed"); + + let response = self + .client + .get(self.origin.clone()) + .send() + .await + .context("GET request")?; + let headers = response.headers().clone(); + + // Immediately drop the response so we don't attempt to fetch the body. + drop(response); + + headers + } + }; + let get_header = |name: &reqwest::header::HeaderName| { - let header = response.headers().get(name)?; + let header = headers.get(name)?; match header.to_str() { Err(e) => { tracing::warn!("failed to deserialize header '{name}', err: {e}"); @@ -129,13 +153,23 @@ impl OneshotSource for HttpOneshotSource { &'s self, object: Self::Object, _checksum: Self::Checksum, - _range: Option>, + range: Option>, ) -> BoxStream<'s, Result> { - // TODO(cf1): Support the range param. // TODO(cf1): Validate our checksum. let initial_response = async move { - let response = self.client.get(object.url).send().await.context("get")?; + let mut request = self.client.get(object.url); + + if let Some(range) = &range { + let value = range.into_range_header_value(); + request = request.header(&reqwest::header::RANGE, value); + } + + // TODO(parkmycar): We should probably assert that the response contains + // an appropriate Content-Range header in the response, and maybe that we + // got back an HTTP 206? + + let response = request.send().await.context("get")?; let bytes_stream = response.bytes_stream().err_into(); Ok::<_, StorageErrorX>(bytes_stream) diff --git a/src/storage-operators/src/oneshot_source/util.rs b/src/storage-operators/src/oneshot_source/util.rs new file mode 100644 index 0000000000000..fa5c0fabda475 --- /dev/null +++ b/src/storage-operators/src/oneshot_source/util.rs @@ -0,0 +1,32 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Utility functions for Oneshot sources. + +/// Utility trait for converting various Rust Range types into a header value. +/// according to the MDN Web Docs. +/// +/// See: +pub trait IntoRangeHeaderValue { + fn into_range_header_value(&self) -> String; +} + +impl IntoRangeHeaderValue for std::ops::Range { + fn into_range_header_value(&self) -> String { + let inclusive_end = std::cmp::max(self.start, self.end.saturating_sub(1)); + (self.start..=inclusive_end).into_range_header_value() + } +} + +impl IntoRangeHeaderValue for std::ops::RangeInclusive { + fn into_range_header_value(&self) -> String { + // See . + format!("bytes={}-{}", self.start(), self.end()) + } +}