Skip to content

Commit

Permalink
http: new style WebSockets, where headers and data are processed by t…
Browse files Browse the repository at this point in the history
…he filter chain. (envoyproxy#3776)

This is the complete HTTP/1.1 implementation of envoyproxy#3301, new style websockets.

It should preserve existing behavior for "old style" websockets except for handling transfer-encoding requests (we all agree shouldn't happen) and responses (actually could happen and have been requested) better.

Risk Level: High (should be self contained but still lots of core code changes)
Testing: Thorough integration tests. unit tests for http1 codec
Docs Changes: added websocket FAQ
Release Notes: added

Fixes envoyproxy#3301 (modulo timeouts not working, which will be addressed by envoyproxy#3654 or envoyproxy#1778)

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Jul 12, 2018
1 parent 6a57863 commit 95c3e13
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ message HttpConnectionManager {
// control.
bool represent_ipv4_remote_address_as_ipv4_mapped_ipv6 = 20;

// [#not-implemented-hide:]
// The configuration for HTTP upgrades.
// For each upgrade type desired, an UpgradeConfig must be added.
//
Expand All @@ -275,6 +274,10 @@ message HttpConnectionManager {
// The current implementation of upgrade headers does not handle
// multi-valued upgrade headers. Support for multi-valued headers may be
// added in the future if needed.
//
// .. warning::
// The current implementation of upgrade headers does not work with HTTP/2
// upstreams.
message UpgradeConfig {
// The case-insensitive name of this upgrade, e.g. "websocket".
// For each upgrade type present in upgrade_configs, requests with
Expand All @@ -286,7 +289,6 @@ message HttpConnectionManager {
// HTTP connections will be used for this upgrade type.
repeated HttpFilter filters = 2;
};
// [#not-implemented-hide:]
repeated UpgradeConfig upgrade_configs = 23;
}

Expand Down
28 changes: 24 additions & 4 deletions docs/root/intro/arch_overview/websocket.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
.. _arch_overview_websocket:

WebSocket support
=================
Envoy currently supports two modes of Upgrade behavior, the new generic upgrade mode, and
the old WebSocket-only TCP proxy mode.


New style Upgrade support
=========================

The new style Upgrade support is intended mainly for WebSocket but may be used for non-WebSocket
upgrades as well. The new style of upgrades pass both the HTTP headers and the upgrade payload
through an HTTP filter chain. One may configure the
:ref:`upgrade_configs <envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.upgrade_configs>`
in one of two ways. If only the
`upgrade_type <envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.UpgradeConfigs.upgrade_type>`
is specified, both the upgrade headers, any request and response body, and WebSocket payload will
pass through the default HTTP filter chain. To avoid the use of HTTP-only filters for upgrade payload,
one can set up custom
`filters <envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.UpgradeConfigs.upgrade_type>`
for the given upgrade type, up to and including only using the router filter to send the WebSocket
data upstream.

Old style WebSocket support
===========================

Envoy supports upgrading a HTTP/1.1 connection to a WebSocket connection.
Connection upgrade will be allowed only if the downstream client
Expand All @@ -18,8 +38,8 @@ retries, rate limits and shadowing are not supported for WebSocket routes.
However, prefix rewriting, explicit and automatic host rewriting, traffic
shifting and splitting are supported.

Connection semantics
--------------------
Old style Connection semantics
------------------------------

Even though WebSocket upgrades occur over HTTP/1.1 connections, WebSockets
proxying works similarly to plain TCP proxy, i.e., Envoy does not interpret
Expand Down
2 changes: 2 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Version history
* health_check: added support for :ref:`health check event logging <arch_overview_health_check_logging>`.
* http: better handling of HEAD requests. Now sending transfer-encoding: chunked rather than content-length: 0.
* http: response filters not applied to early error paths such as http_parser generated 400s.
* http: added generic +:ref:`Upgrade support
<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.upgrade_configs>`
* lua: added :ref:`connection() <config_http_filters_lua_connection_wrapper>` wrapper and *ssl()* API.
* lua: added :ref:`requestInfo() <config_http_filters_lua_request_info_wrapper>` wrapper and *protocol()* API.
* ratelimit: added support for :repo:`api/envoy/service/ratelimit/v2/rls.proto`.
Expand Down
27 changes: 23 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {

void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
request_headers_ = std::move(headers);
createFilterChain();
const bool upgrade_rejected = createFilterChain() == false;

maybeEndDecode(end_stream);

Expand Down Expand Up @@ -569,7 +569,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
connection_manager_.stats_.named_.downstream_cx_http1_active_.dec();
connection_manager_.stats_.named_.downstream_cx_websocket_total_.inc();
return;
} else if (websocket_requested) {
} else if (upgrade_rejected) {
// Do not allow WebSocket upgrades if the route does not support it.
connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "",
Expand Down Expand Up @@ -942,7 +942,12 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte

if (connection_manager_.drain_state_ == DrainState::Closing &&
connection_manager_.codec_->protocol() != Protocol::Http2) {
headers.insertConnection().value().setReference(Headers::get().ConnectionValues.Close);
// If the connection manager is draining send "Connection: Close" on HTTP/1.1 connections.
// Do not do this for H2 (which drains via GOAWA) or Upgrade (as the upgrade
// payload is no longer HTTP/1.1)
if (headers.Connection() == nullptr || headers.Connection()->value() != "Upgrade") {
headers.insertConnection().value().setReference(Headers::get().ConnectionValues.Close);
}
}

if (connection_manager_.config_.tracingConfig()) {
Expand Down Expand Up @@ -1118,8 +1123,22 @@ void ConnectionManagerImpl::ActiveStream::setBufferLimit(uint32_t new_limit) {
}
}

void ConnectionManagerImpl::ActiveStream::createFilterChain() {
bool ConnectionManagerImpl::ActiveStream::createFilterChain() {
bool upgrade_rejected = false;
auto upgrade = request_headers_->Upgrade();
if (upgrade != nullptr) {
if (connection_manager_.config_.filterFactory().createUpgradeFilterChain(
upgrade->value().c_str(), *this)) {
return true;
} else {
upgrade_rejected = true;
// Fall through to the default filter chain. The function calling this
// will send a local reply indicating that the upgrade failed.
}
}

connection_manager_.config_.filterFactory().createFilterChain(*this);
return !upgrade_rejected;
}

void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// Possibly increases buffer_limit_ to the value of limit.
void setBufferLimit(uint32_t limit);
// Set up the Encoder/Decoder filter chain.
void createFilterChain();
bool createFilterChain();

ConnectionManagerImpl& connection_manager_;
Router::ConfigConstSharedPtr snapped_route_config_;
Expand Down
1 change: 0 additions & 1 deletion source/common/http/conn_manager_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Network::Address::InstanceConstSharedPtr ConnectionManagerUtility::mutateRequest
request_headers.removeEnvoyInternalRequest();
request_headers.removeKeepAlive();
request_headers.removeProxyConnection();
// TODO(alyssawilk) handle this with current and new websocket here and below.
request_headers.removeTransferEncoding();

// If we are "using remote address" this means that we create/append to XFF with our immediate
Expand Down
61 changes: 55 additions & 6 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ void StreamEncoderImpl::encodeHeaders(const HeaderMap& headers, bool end_stream)
Headers::get().TransferEncoding.get().size(),
Headers::get().TransferEncodingValues.Chunked.c_str(),
Headers::get().TransferEncodingValues.Chunked.size());
// We do not aply chunk encoding for HTTP upgrades.
// If there is a body in a WebSocket Upgrade response, the chunks will be
// passed through via maybeDirectDispatch so we need to avoid appending
// extra chunk boundaries.
//
// When sending a response to a HEAD request Envoy may send an informational
// "Transfer-Encoding: chunked" header, but should not send a chunk encoded body.
chunk_encoding_ = !is_response_to_head_request_;
chunk_encoding_ = !Utility::isUpgrade(headers) && !is_response_to_head_request_;
}
}

Expand Down Expand Up @@ -279,7 +284,7 @@ http_parser_settings ConnectionImpl::settings_{
return 0;
},
[](http_parser* parser) -> int {
static_cast<ConnectionImpl*>(parser->data)->onMessageComplete();
static_cast<ConnectionImpl*>(parser->data)->onMessageCompleteBase();
return 0;
},
nullptr, // on_chunk_header
Expand Down Expand Up @@ -313,9 +318,32 @@ void ConnectionImpl::completeLastHeader() {
ASSERT(current_header_value_.empty());
}

bool ConnectionImpl::maybeDirectDispatch(Buffer::Instance& data) {
if (!handling_upgrade_) {
// Only direct dispatch for Upgrade requests.
return false;
}

ssize_t total_parsed = 0;
uint64_t num_slices = data.getRawSlices(nullptr, 0);
Buffer::RawSlice slices[num_slices];
data.getRawSlices(slices, num_slices);
for (Buffer::RawSlice& slice : slices) {
total_parsed += slice.len_;
onBody(static_cast<const char*>(slice.mem_), slice.len_);
}
ENVOY_CONN_LOG(trace, "direct-dispatched {} bytes", connection_, total_parsed);
data.drain(total_parsed);
return true;
}

void ConnectionImpl::dispatch(Buffer::Instance& data) {
ENVOY_CONN_LOG(trace, "parsing {} bytes", connection_, data.length());

if (maybeDirectDispatch(data)) {
return;
}

// Always unpause before dispatch.
http_parser_pause(&parser_, 0);

Expand All @@ -333,6 +361,10 @@ void ConnectionImpl::dispatch(Buffer::Instance& data) {

ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed);
data.drain(total_parsed);

// If an upgrade has been handled and there is body data or early upgrade
// payload to send on, send it on.
maybeDirectDispatch(data);
}

size_t ConnectionImpl::dispatchSlice(const char* slice, size_t len) {
Expand Down Expand Up @@ -377,11 +409,30 @@ int ConnectionImpl::onHeadersCompleteBase() {
// HTTP/1.1 or not.
protocol_ = Protocol::Http10;
}
if (Utility::isUpgrade(*current_header_map_)) {
ENVOY_CONN_LOG(trace, "codec entering upgrade mode.", connection_);
handling_upgrade_ = true;
}

int rc = onHeadersComplete(std::move(current_header_map_));
current_header_map_.reset();
header_parsing_state_ = HeaderParsingState::Done;
return rc;

// Returning 2 informs http_parser to not expect a body or further data on this connection.
return handling_upgrade_ ? 2 : rc;
}

void ConnectionImpl::onMessageCompleteBase() {
ENVOY_CONN_LOG(trace, "message complete", connection_);
if (handling_upgrade_) {
// If this is an upgrade request, swallow the onMessageComplete. The
// upgrade payload will be treated as stream body.
ASSERT(!deferred_end_stream_headers_);
ENVOY_CONN_LOG(trace, "Pausing parser due to upgrade.", connection_);
http_parser_pause(&parser_, 1);
return;
}
onMessageComplete();
}

void ConnectionImpl::onMessageBeginBase() {
Expand Down Expand Up @@ -514,7 +565,7 @@ int ServerConnectionImpl::onHeadersComplete(HeaderMapImplPtr&& headers) {
// scenario where the higher layers stream through and implicitly switch to chunked transfer
// encoding because end stream with zero body length has not yet been indicated.
if (parser_.flags & F_CHUNKED ||
(parser_.content_length > 0 && parser_.content_length != ULLONG_MAX)) {
(parser_.content_length > 0 && parser_.content_length != ULLONG_MAX) || handling_upgrade_) {
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

// If the connection has been closed (or is closing) after decoding headers, pause the parser
Expand Down Expand Up @@ -556,7 +607,6 @@ void ServerConnectionImpl::onBody(const char* data, size_t length) {

void ServerConnectionImpl::onMessageComplete() {
if (active_request_) {
ENVOY_CONN_LOG(trace, "message complete", connection_);
Buffer::OwnedImpl buffer;
active_request_->remote_complete_ = true;

Expand Down Expand Up @@ -675,7 +725,6 @@ void ClientConnectionImpl::onBody(const char* data, size_t length) {
}

void ClientConnectionImpl::onMessageComplete() {
ENVOY_CONN_LOG(trace, "message complete", connection_);
if (ignore_message_complete_for_100_continue_) {
ignore_message_complete_for_100_continue_ = false;
return;
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
uint32_t bufferLimit() { return connection_.bufferLimit(); }
virtual bool supports_http_10() { return false; }

bool maybeDirectDispatch(Buffer::Instance& data);

protected:
ConnectionImpl(Network::Connection& connection, http_parser_type type);

Expand All @@ -164,6 +166,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
http_parser parser_;
HeaderMapPtr deferred_end_stream_headers_;
Http::Code error_code_{Http::Code::BadRequest};
bool handling_upgrade_{};

private:
enum class HeaderParsingState { Field, Value, Done };
Expand Down Expand Up @@ -226,6 +229,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
/**
* Called when the request/response is complete.
*/
void onMessageCompleteBase();
virtual void onMessageComplete() PURE;

/**
Expand Down
Loading

0 comments on commit 95c3e13

Please sign in to comment.