Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract websocket functionality that is common to both server and future client #2582

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ add_library (seastar
include/seastar/util/closeable.hh
include/seastar/util/source_location-compat.hh
include/seastar/util/short_streams.hh
include/seastar/websocket/common.hh
include/seastar/websocket/server.hh
src/core/alien.cc
src/core/file.cc
Expand Down Expand Up @@ -777,6 +778,8 @@ add_library (seastar
src/util/read_first_line.cc
src/util/tmp_file.cc
src/util/short_streams.cc
src/websocket/parser.cc
src/websocket/common.cc
src/websocket/server.cc
)

Expand Down
173 changes: 173 additions & 0 deletions include/seastar/websocket/common.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2024 ScyllaDB
*/

#pragma once

#include <seastar/core/seastar.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/queue.hh>
#include <seastar/net/api.hh>
#include <seastar/util/log.hh>
#include <seastar/websocket/parser.hh>

namespace seastar::experimental::websocket {

extern sstring magic_key_suffix;

using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;

class server;

/// \defgroup websocket WebSocket
/// \addtogroup websocket
/// @{

/*!
* \brief an error in handling a WebSocket connection
*/
class exception : public std::exception {
std::string _msg;
public:
exception(std::string_view msg) : _msg(msg) {}
virtual const char* what() const noexcept {
return _msg.c_str();
}
};

/*!
* \brief a server WebSocket connection
*/
class connection : public boost::intrusive::list_base_hook<> {
protected:
using buff_t = temporary_buffer<char>;

/*!
* \brief Implementation of connection's data source.
*/
class connection_source_impl final : public data_source_impl {
queue<buff_t>* data;

public:
connection_source_impl(queue<buff_t>* data) : data(data) {}

virtual future<buff_t> get() override {
return data->pop_eventually().then_wrapped([](future<buff_t> f){
try {
return make_ready_future<buff_t>(std::move(f.get()));
} catch(...) {
return current_exception_as_future<buff_t>();
}
});
}

virtual future<> close() override {
data->push(buff_t(0));
return make_ready_future<>();
}
};

/*!
* \brief Implementation of connection's data sink.
*/
class connection_sink_impl final : public data_sink_impl {
queue<buff_t>* data;
public:
connection_sink_impl(queue<buff_t>* data) : data(data) {}

virtual future<> put(net::packet d) override {
net::fragment f = d.frag(0);
return data->push_eventually(temporary_buffer<char>{std::move(f.base), f.size});
}

size_t buffer_size() const noexcept override {
return data->max_size();
}

virtual future<> close() override {
data->push(buff_t(0));
return make_ready_future<>();
}
};

/*!
* \brief This function processess received PING frame.
* https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2
*/
future<> handle_ping();
/*!
* \brief This function processess received PONG frame.
* https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3
*/
future<> handle_pong();

static const size_t PIPE_SIZE = 512;
connected_socket _fd;
input_stream<char> _read_buf;
output_stream<char> _write_buf;
bool _done = false;

websocket_parser _websocket_parser;
queue <temporary_buffer<char>> _input_buffer;
input_stream<char> _input;
queue <temporary_buffer<char>> _output_buffer;
output_stream<char> _output;

sstring _subprotocol;
handler_t _handler;
public:
/*!
* \param fd established socket used for communication
*/
connection(connected_socket&& fd)
: _fd(std::move(fd))
, _read_buf(_fd.input())
, _write_buf(_fd.output())
, _input_buffer{PIPE_SIZE}
, _output_buffer{PIPE_SIZE}
{
_input = input_stream<char>{data_source{
std::make_unique<connection_source_impl>(&_input_buffer)}};
_output = output_stream<char>{data_sink{
std::make_unique<connection_sink_impl>(&_output_buffer)}};
}

/*!
* \brief close the socket
*/
void shutdown_input();
future<> close(bool send_close = true);

protected:
future<> read_one();
future<> response_loop();
/*!
* \brief Packs buff in websocket frame and sends it to the client.
*/
future<> send_data(opcodes opcode, temporary_buffer<char>&& buff);
};

std::string sha1_base64(std::string_view source);
std::string encode_base64(std::string_view source);

extern logger websocket_logger;

/// @}
}
143 changes: 143 additions & 0 deletions include/seastar/websocket/parser.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <seastar/core/seastar.hh>
#include <seastar/core/iostream.hh>

namespace seastar::experimental::websocket {

/// \addtogroup websocket
/// @{

/*!
* \brief Possible type of a websocket frame.
*/
enum opcodes {
CONTINUATION = 0x0,
TEXT = 0x1,
BINARY = 0x2,
CLOSE = 0x8,
PING = 0x9,
PONG = 0xA,
INVALID = 0xFF,
};

struct frame_header {
static constexpr uint8_t FIN = 7;
static constexpr uint8_t RSV1 = 6;
static constexpr uint8_t RSV2 = 5;
static constexpr uint8_t RSV3 = 4;
static constexpr uint8_t MASKED = 7;

uint8_t fin : 1;
uint8_t rsv1 : 1;
uint8_t rsv2 : 1;
uint8_t rsv3 : 1;
uint8_t opcode : 4;
uint8_t masked : 1;
uint8_t length : 7;
frame_header(const char* input) {
this->fin = (input[0] >> FIN) & 1;
this->rsv1 = (input[0] >> RSV1) & 1;
this->rsv2 = (input[0] >> RSV2) & 1;
this->rsv3 = (input[0] >> RSV3) & 1;
this->opcode = input[0] & 0b1111;
this->masked = (input[1] >> MASKED) & 1;
this->length = (input[1] & 0b1111111);
}
// Returns length of the rest of the header.
uint64_t get_rest_of_header_length() {
size_t next_read_length = sizeof(uint32_t); // Masking key
if (length == 126) {
next_read_length += sizeof(uint16_t);
} else if (length == 127) {
next_read_length += sizeof(uint64_t);
}
return next_read_length;
}
uint8_t get_fin() {return fin;}
uint8_t get_rsv1() {return rsv1;}
uint8_t get_rsv2() {return rsv2;}
uint8_t get_rsv3() {return rsv3;}
uint8_t get_opcode() {return opcode;}
uint8_t get_masked() {return masked;}
uint8_t get_length() {return length;}

bool is_opcode_known() {
//https://datatracker.ietf.org/doc/html/rfc6455#section-5.1
return opcode < 0xA && !(opcode < 0x8 && opcode > 0x2);
}
};

class websocket_parser {
enum class parsing_state : uint8_t {
flags_and_payload_data,
payload_length_and_mask,
payload
};
enum class connection_state : uint8_t {
valid,
closed,
error
};
using consumption_result_t = consumption_result<char>;
using buff_t = temporary_buffer<char>;
// What parser is currently doing.
parsing_state _state;
// State of connection - can be valid, closed or should be closed
// due to error.
connection_state _cstate;
sstring _buffer;
std::unique_ptr<frame_header> _header;
uint64_t _payload_length = 0;
uint64_t _consumed_payload_length = 0;
uint32_t _masking_key;
buff_t _result;

static future<consumption_result_t> dont_stop() {
return make_ready_future<consumption_result_t>(continue_consuming{});
}
static future<consumption_result_t> stop(buff_t data) {
return make_ready_future<consumption_result_t>(stop_consuming(std::move(data)));
}
uint64_t remaining_payload_length() const {
return _payload_length - _consumed_payload_length;
}

// Removes mask from payload given in p.
void remove_mask(buff_t& p, size_t n) {
char *payload = p.get_write();
for (uint64_t i = 0, j = 0; i < n; ++i, j = (j + 1) % 4) {
payload[i] ^= static_cast<char>(((_masking_key << (j * 8)) >> 24));
}
}
public:
websocket_parser() : _state(parsing_state::flags_and_payload_data),
_cstate(connection_state::valid),
_masking_key(0) {}
future<consumption_result_t> operator()(temporary_buffer<char> data);
bool is_valid() { return _cstate == connection_state::valid; }
bool eof() { return _cstate == connection_state::closed; }
opcodes opcode() const;
buff_t result();
};

/// @}
}
Loading
Loading