Skip to content

Commit

Permalink
Provide a way to override windows memory allocator with jemalloc for …
Browse files Browse the repository at this point in the history
…ZSTD

Summary:
Windows does not have LD_PRELOAD mechanism to override all memory allocation functions and ZSTD makes use of C-tuntime calloc. During flushes and compactions default system allocator fragments and the system slows down considerably.

For builds with jemalloc we employ an advanced ZSTD context creation API that re-directs memory allocation to jemalloc. To reduce the cost of context creation on each block we cache ZSTD context within the block based table builder while a new SST file is being built, this will help all platform builds including those w/o jemalloc. This avoids system allocator fragmentation and improves the performance.

The change does not address random reads and currently on Windows reads with ZSTD regress as compared with SNAPPY compression.
Closes facebook/rocksdb#3838

Differential Revision: D8229794

Pulled By: miasantreble

fbshipit-source-id: 719b622ab7bf4109819bc44f45ec66f0dd3ee80d
  • Loading branch information
yuslepukhin authored and facebook-github-bot committed Jun 4, 2018
1 parent 4f297ad commit f4b72d7
Show file tree
Hide file tree
Showing 20 changed files with 660 additions and 202 deletions.
19 changes: 15 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,24 @@ if(DEFINED USE_RTTI)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
else()
message(STATUS "Disabling RTTI")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
if(MSVC)
message(STATUS "Disabling RTTI in Release builds. Always on in Debug.")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
message(STATUS "Disabling RTTI in Release builds")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
else()
message(STATUS "Enabling RTTI in Debug builds only (default)")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
if(MSVC)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()

if(MSVC)
Expand Down Expand Up @@ -574,6 +584,7 @@ set(SOURCES
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc
util/compression_context_cache.cc
util/concurrent_arena.cc
util/crc32c.cc
util/delete_scheduler.cc
Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ cpp_library(
"util/coding.cc",
"util/compaction_job_stats_impl.cc",
"util/comparator.cc",
"util/compression_context_cache.cc",
"util/concurrent_arena.cc",
"util/crc32c.cc",
"util/delete_scheduler.cc",
Expand Down
12 changes: 5 additions & 7 deletions port/win/env_default.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@

#include <rocksdb/env.h>
#include "port/win/env_win.h"
#include "util/compression_context_cache.h"
#include "util/thread_local.h"

namespace rocksdb {
namespace port {

// We choose to create this on the heap and using std::once for the following
// reasons
// 1) Currently available MS compiler does not implement atomic C++11
// initialization of
// function local statics
// 2) We choose not to destroy the env because joining the threads from the
// We choose not to destroy the env because joining the threads from the
// system loader
// which destroys the statics (same as from DLLMain) creates a system loader
// dead-lock.
Expand All @@ -29,11 +26,12 @@ namespace {
std::once_flag winenv_once_flag;
Env* envptr;
};

}

Env* Env::Default() {
using namespace port;
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); });
return envptr;
}
Expand Down
19 changes: 19 additions & 0 deletions port/win/win_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@
#include <stdexcept>
#include "jemalloc/jemalloc.h"

#if defined(ZSTD) && defined(ZSTD_STATIC_LINKING_ONLY)
#include <zstd.h>
#if (ZSTD_VERSION_NUMBER >= 500)
namespace rocksdb {
namespace port {
void* JemallocAllocateForZSTD(void* /* opaque */, size_t size) {
return je_malloc(size);
}
void JemallocDeallocateForZSTD(void* /* opaque */, void* address) {
je_free(address);
}
ZSTD_customMem GetJeZstdAllocationOverrides() {
return { JemallocAllocateForZSTD, JemallocDeallocateForZSTD, nullptr };
}
} // namespace port
} // namespace rocksdb
#endif // (ZSTD_VERSION_NUMBER >= 500)
#endif // defined(ZSTD) defined(ZSTD_STATIC_LINKING_ONLY)

// Global operators to be replaced by a linker when this file is
// a part of the build

Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ LIB_SOURCES = \
util/coding.cc \
util/compaction_job_stats_impl.cc \
util/comparator.cc \
util/compression_context_cache.cc \
util/concurrent_arena.cc \
util/crc32c.cc \
util/delete_scheduler.cc \
Expand Down
76 changes: 48 additions & 28 deletions table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,36 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {

// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version,
const Slice& compression_dict,
std::string* compressed_output) {
if (*type == kNoCompression) {
*type = compression_ctx.type();
if (compression_ctx.type() == kNoCompression) {
return raw;
}

// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (*type) {
switch (compression_ctx.type()) {
case kSnappyCompression:
if (Snappy_Compress(compression_options, raw.data(), raw.size(),
if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kZlibCompression:
if (Zlib_Compress(
compression_options,
compression_ctx,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) &&
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kBZip2Compression:
if (BZip2_Compress(
compression_options,
compression_ctx,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
Expand All @@ -142,18 +142,18 @@ Slice CompressBlock(const Slice& raw,
break; // fall back to no compression.
case kLZ4Compression:
if (LZ4_Compress(
compression_options,
compression_ctx,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) &&
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4HCCompression:
if (LZ4HC_Compress(
compression_options,
compression_ctx,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) &&
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
Expand All @@ -167,8 +167,8 @@ Slice CompressBlock(const Slice& raw,
break;
case kZSTD:
case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_options, raw.data(), raw.size(),
compressed_output, compression_dict) &&
if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
Expand Down Expand Up @@ -261,10 +261,10 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr;

std::string last_key;
const CompressionType compression_type;
const CompressionOptions compression_opts;
// Data for presetting the compression library's dictionary, or nullptr.
const std::string* compression_dict;
// Compression dictionary or nullptr
const std::string* compression_dict;
CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx;
TableProperties props;

bool closed = false; // Either Finish() or Abandon() has been called.
Expand Down Expand Up @@ -306,9 +306,8 @@ struct BlockBasedTableBuilder::Rep {
table_options.use_delta_encoding),
range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_dict(_compression_dict),
compression_ctx(_compression_type, _compression_opts),
compressed_cache_key_prefix_size(0),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
Expand Down Expand Up @@ -342,6 +341,16 @@ struct BlockBasedTableBuilder::Rep {
new BlockBasedTablePropertiesCollector(
table_options.index_type, table_options.whole_key_filtering,
_moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
compression_ctx.type()));
}
}

Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;

~Rep() {
}
};

Expand Down Expand Up @@ -480,7 +489,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
assert(ok());
Rep* r = rep_;

auto type = r->compression_type;
auto type = r->compression_ctx.type();
Slice block_contents;
bool abort_compression = false;

Expand All @@ -490,22 +499,33 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
compression_dict = *r->compression_dict;
r->compression_ctx.dict() = *r->compression_dict;
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = *r->compression_dict;
}
} else {
// Clear dictionary
r->compression_ctx.dict() = Slice();
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = Slice();
}
}

block_contents = CompressBlock(raw_block_contents, r->compression_opts,
&type, r->table_options.format_version,
compression_dict, &r->compressed_output);
block_contents = CompressBlock(raw_block_contents, r->compression_ctx,
&type, r->table_options.format_version,
&r->compressed_output);

// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType(
Status stat = UncompressBlockContentsForCompressionType(*r->verify_ctx,
block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type,
r->table_options.format_version,
r->ioptions);

if (stat.ok()) {
Expand Down Expand Up @@ -739,7 +759,7 @@ Status BlockBasedTableBuilder::Finish() {
r->props.merge_operator_name = r->ioptions.merge_operator != nullptr
? r->ioptions.merge_operator->Name()
: "nullptr";
r->props.compression_name = CompressionTypeToString(r->compression_type);
r->props.compression_name = CompressionTypeToString(r->compression_ctx.type());
r->props.prefix_extractor_name =
r->moptions.prefix_extractor != nullptr
? r->moptions.prefix_extractor->Name()
Expand Down
12 changes: 6 additions & 6 deletions table/block_based_table_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "table/table_builder.h"
#include "util/compression.h"

namespace rocksdb {

Expand Down Expand Up @@ -53,6 +54,10 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();

// No copying allowed
BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete;
BlockBasedTableBuilder& operator=(const BlockBasedTableBuilder&) = delete;

// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
Expand Down Expand Up @@ -115,16 +120,11 @@ class BlockBasedTableBuilder : public TableBuilder {
// Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();

// No copying allowed
BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete;
void operator=(const BlockBasedTableBuilder&) = delete;
};

Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version,
const Slice& compression_dict,
std::string* compressed_output);

} // namespace rocksdb
13 changes: 9 additions & 4 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1104,9 +1104,12 @@ Status BlockBasedTable::GetDataBlockFromCache(

// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
s = UncompressBlockContents(compressed_block->data(),
UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx,
compressed_block->data(),
compressed_block->size(), &contents,
format_version, compression_dict,
format_version,
ioptions);

// Insert uncompressed block into block cache
Expand Down Expand Up @@ -1182,8 +1185,10 @@ Status BlockBasedTable::PutDataBlockToCache(
BlockContents contents;
Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
format_version, compression_dict, ioptions);
UncompressionContext uncompression_ctx(raw_block->compression_type(), compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents,
format_version, ioptions);
}
if (!s.ok()) {
delete raw_block;
Expand Down
6 changes: 4 additions & 2 deletions table/block_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,10 @@ Status BlockFetcher::ReadBlockContents() {

if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
status_ = UncompressBlockContents(slice_.data(), block_size_, contents_,
footer_.version(), compression_dict_,
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
status_ = UncompressBlockContents(uncompression_ctx,
slice_.data(), block_size_, contents_,
footer_.version(),
ioptions_);
} else {
GetBlockContents();
Expand Down
Loading

0 comments on commit f4b72d7

Please sign in to comment.