Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Testing-only PR to check maint-19.0.1 status #45401

Open
wants to merge 10 commits into
base: maint-19.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/appveyor-cpp-setup.bat
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ set CXX=cl.exe
@rem Download Minio somewhere on PATH, for unit tests
@rem
if "%ARROW_S3%" == "ON" (
appveyor DownloadFile https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2024-09-13T20-26-02Z -FileName C:\Windows\Minio.exe || exit /B
appveyor DownloadFile https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2025-01-20T14-49-07Z -FileName C:\Windows\Minio.exe || exit /B
)

@rem
Expand Down
6 changes: 4 additions & 2 deletions ci/conda_env_python.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
# specific language governing permissions and limitations
# under the License.

# don't add pandas here, because it is not a mandatory test dependency
boto3 # not a direct dependency of s3fs, but needed for our s3fs fixture
# Don't add pandas here, because it is not a mandatory test dependency

# Not a direct dependency of s3fs, but needed for our s3fs fixture
boto3
cffi
cython>=0.29.31
cloudpickle
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/install_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ if [ "${version}" != "latest" ]; then
fi

# Use specific versions for minio server and client to avoid CI failures on new releases.
minio_version="minio.RELEASE.2024-09-13T20-26-02Z"
minio_version="minio.RELEASE.2025-01-20T14-49-07Z"
mc_version="mc.RELEASE.2024-09-16T17-43-14Z"

download()
Expand Down
21 changes: 14 additions & 7 deletions cpp/src/arrow/compute/key_map_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ void SwissTable::early_filter_imp(const int num_keys, const uint32_t* hashes,
// Extract from hash: block index and stamp
//
uint32_t hash = hashes[i];
uint32_t iblock = hash >> (bits_hash_ - bits_stamp_ - log_blocks_);
uint32_t iblock = hash >> bits_shift_for_block_and_stamp_;
uint32_t stamp = iblock & stamp_mask;
iblock >>= bits_stamp_;
iblock >>= bits_shift_for_block_;

uint32_t num_block_bytes = num_groupid_bits + 8;
const uint8_t* blockbase =
Expand Down Expand Up @@ -399,7 +399,7 @@ bool SwissTable::find_next_stamp_match(const uint32_t hash, const uint32_t in_sl
const uint64_t num_groupid_bits = num_groupid_bits_from_log_blocks(log_blocks_);
constexpr uint64_t stamp_mask = 0x7f;
const int stamp =
static_cast<int>((hash >> (bits_hash_ - log_blocks_ - bits_stamp_)) & stamp_mask);
static_cast<int>((hash >> bits_shift_for_block_and_stamp_) & stamp_mask);
uint64_t start_slot_id = wrap_global_slot_id(in_slot_id);
int match_found;
int local_slot;
Expand Down Expand Up @@ -659,6 +659,9 @@ Status SwissTable::grow_double() {
int num_group_id_bits_after = num_groupid_bits_from_log_blocks(log_blocks_ + 1);
uint64_t group_id_mask_before = ~0ULL >> (64 - num_group_id_bits_before);
int log_blocks_after = log_blocks_ + 1;
int bits_shift_for_block_and_stamp_after =
ComputeBitsShiftForBlockAndStamp(log_blocks_after);
int bits_shift_for_block_after = ComputeBitsShiftForBlock(log_blocks_after);
uint64_t block_size_before = (8 + num_group_id_bits_before);
uint64_t block_size_after = (8 + num_group_id_bits_after);
uint64_t block_size_total_after = (block_size_after << log_blocks_after) + padding_;
Expand Down Expand Up @@ -701,8 +704,7 @@ Status SwissTable::grow_double() {
}

int ihalf = block_id_new & 1;
uint8_t stamp_new =
hash >> ((bits_hash_ - log_blocks_after - bits_stamp_)) & stamp_mask;
uint8_t stamp_new = (hash >> bits_shift_for_block_and_stamp_after) & stamp_mask;
uint64_t group_id_bit_offs = j * num_group_id_bits_before;
uint64_t group_id =
(util::SafeLoadAs<uint64_t>(block_base + 8 + (group_id_bit_offs >> 3)) >>
Expand Down Expand Up @@ -744,8 +746,7 @@ Status SwissTable::grow_double() {
(util::SafeLoadAs<uint64_t>(block_base + 8 + (group_id_bit_offs >> 3)) >>
(group_id_bit_offs & 7)) &
group_id_mask_before;
uint8_t stamp_new =
hash >> ((bits_hash_ - log_blocks_after - bits_stamp_)) & stamp_mask;
uint8_t stamp_new = (hash >> bits_shift_for_block_and_stamp_after) & stamp_mask;

uint8_t* block_base_new =
blocks_new->mutable_data() + block_id_new * block_size_after;
Expand Down Expand Up @@ -773,6 +774,8 @@ Status SwissTable::grow_double() {
blocks_ = std::move(blocks_new);
hashes_ = std::move(hashes_new_buffer);
log_blocks_ = log_blocks_after;
bits_shift_for_block_and_stamp_ = bits_shift_for_block_and_stamp_after;
bits_shift_for_block_ = bits_shift_for_block_after;

return Status::OK();
}
Expand All @@ -784,6 +787,8 @@ Status SwissTable::init(int64_t hardware_flags, MemoryPool* pool, int log_blocks
log_minibatch_ = util::MiniBatch::kLogMiniBatchLength;

log_blocks_ = log_blocks;
bits_shift_for_block_and_stamp_ = ComputeBitsShiftForBlockAndStamp(log_blocks_);
bits_shift_for_block_ = ComputeBitsShiftForBlock(log_blocks_);
int num_groupid_bits = num_groupid_bits_from_log_blocks(log_blocks_);
num_inserted_ = 0;

Expand Down Expand Up @@ -820,6 +825,8 @@ void SwissTable::cleanup() {
hashes_ = nullptr;
}
log_blocks_ = 0;
bits_shift_for_block_and_stamp_ = ComputeBitsShiftForBlockAndStamp(log_blocks_);
bits_shift_for_block_ = ComputeBitsShiftForBlock(log_blocks_);
num_inserted_ = 0;
}

Expand Down
25 changes: 23 additions & 2 deletions cpp/src/arrow/compute/key_map_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,23 @@ class ARROW_EXPORT SwissTable {
// Resize large hash tables when 75% full.
Status grow_double();

// When log_blocks is greater than 25, there will be overlapping bits between block id
// and stamp within a 32-bit hash value. So we must check if this is the case when
// right shifting a hash value to retrieve block id and stamp. The following two
// functions derive the number of bits to right shift from the given log_blocks.
static int ComputeBitsShiftForBlockAndStamp(int log_blocks) {
if (ARROW_PREDICT_FALSE(log_blocks + bits_stamp_ > bits_hash_)) {
return 0;
}
return bits_hash_ - log_blocks - bits_stamp_;
}
static int ComputeBitsShiftForBlock(int log_blocks) {
if (ARROW_PREDICT_FALSE(log_blocks + bits_stamp_ > bits_hash_)) {
return bits_hash_ - log_blocks;
}
return bits_stamp_;
}

// Number of hash bits stored in slots in a block.
// The highest bits of hash determine block id.
// The next set of highest bits is a "stamp" stored in a slot in a block.
Expand All @@ -214,6 +231,11 @@ class ARROW_EXPORT SwissTable {
int log_minibatch_;
// Base 2 log of the number of blocks
int log_blocks_ = 0;
// The following two variables are derived from log_blocks_ as log_blocks_ changes, and
// used in tight loops to avoid calling the ComputeXXX functions (introducing a
// branching on whether log_blocks_ + bits_stamp_ > bits_hash_).
int bits_shift_for_block_and_stamp_ = ComputeBitsShiftForBlockAndStamp(log_blocks_);
int bits_shift_for_block_ = ComputeBitsShiftForBlock(log_blocks_);
// Number of keys inserted into hash table
uint32_t num_inserted_ = 0;

Expand Down Expand Up @@ -271,8 +293,7 @@ void SwissTable::insert_into_empty_slot(uint32_t slot_id, uint32_t hash,
constexpr uint64_t stamp_mask = 0x7f;

int start_slot = (slot_id & 7);
int stamp =
static_cast<int>((hash >> (bits_hash_ - log_blocks_ - bits_stamp_)) & stamp_mask);
int stamp = static_cast<int>((hash >> bits_shift_for_block_and_stamp_) & stamp_mask);
uint64_t block_id = slot_id >> 3;
uint8_t* blockbase = blocks_->mutable_data() + num_block_bytes * block_id;

Expand Down
55 changes: 32 additions & 23 deletions cpp/src/arrow/compute/key_map_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ int SwissTable::early_filter_imp_avx2_x8(const int num_hashes, const uint32_t* h
// Calculate block index and hash stamp for a byte in a block
//
__m256i vhash = _mm256_loadu_si256(vhash_ptr + i);
__m256i vblock_id = _mm256_srlv_epi32(
vhash, _mm256_set1_epi32(bits_hash_ - bits_stamp_ - log_blocks_));
__m256i vblock_id = _mm256_srli_epi32(vhash, bits_shift_for_block_and_stamp_);
__m256i vstamp = _mm256_and_si256(vblock_id, vstamp_mask);
vblock_id = _mm256_srli_epi32(vblock_id, bits_stamp_);
vblock_id = _mm256_srli_epi32(vblock_id, bits_shift_for_block_);

// We now split inputs and process 4 at a time,
// in order to process 64-bit blocks
Expand Down Expand Up @@ -301,19 +300,15 @@ int SwissTable::early_filter_imp_avx2_x32(const int num_hashes, const uint32_t*
_mm256_and_si256(vhash2, _mm256_set1_epi32(0xffff0000)));
vhash1 = _mm256_or_si256(_mm256_srli_epi32(vhash1, 16),
_mm256_and_si256(vhash3, _mm256_set1_epi32(0xffff0000)));
__m256i vstamp_A = _mm256_and_si256(
_mm256_srlv_epi32(vhash0, _mm256_set1_epi32(16 - log_blocks_ - 7)),
_mm256_set1_epi16(0x7f));
__m256i vstamp_B = _mm256_and_si256(
_mm256_srlv_epi32(vhash1, _mm256_set1_epi32(16 - log_blocks_ - 7)),
_mm256_set1_epi16(0x7f));
__m256i vstamp_A = _mm256_and_si256(_mm256_srli_epi32(vhash0, 16 - log_blocks_ - 7),
_mm256_set1_epi16(0x7f));
__m256i vstamp_B = _mm256_and_si256(_mm256_srli_epi32(vhash1, 16 - log_blocks_ - 7),
_mm256_set1_epi16(0x7f));
__m256i vstamp = _mm256_or_si256(vstamp_A, _mm256_slli_epi16(vstamp_B, 8));
__m256i vblock_id_A =
_mm256_and_si256(_mm256_srlv_epi32(vhash0, _mm256_set1_epi32(16 - log_blocks_)),
_mm256_set1_epi16(block_id_mask));
__m256i vblock_id_B =
_mm256_and_si256(_mm256_srlv_epi32(vhash1, _mm256_set1_epi32(16 - log_blocks_)),
_mm256_set1_epi16(block_id_mask));
__m256i vblock_id_A = _mm256_and_si256(_mm256_srli_epi32(vhash0, 16 - log_blocks_),
_mm256_set1_epi16(block_id_mask));
__m256i vblock_id_B = _mm256_and_si256(_mm256_srli_epi32(vhash1, 16 - log_blocks_),
_mm256_set1_epi16(block_id_mask));
__m256i vblock_id = _mm256_or_si256(vblock_id_A, _mm256_slli_epi16(vblock_id_B, 8));

// Visit all block bytes in reverse order (overwriting data on multiple matches)
Expand Down Expand Up @@ -392,16 +387,30 @@ int SwissTable::extract_group_ids_avx2(const int num_keys, const uint32_t* hashe
} else {
for (int i = 0; i < num_keys / unroll; ++i) {
__m256i hash = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(hashes) + i);
// Extend hash and local_slot to 64-bit to compute 64-bit group id offsets to
// gather from. This is to prevent index overflow issues in GH-44513.
// NB: Use zero-extend conversion for unsigned hash.
__m256i hash_lo = _mm256_cvtepu32_epi64(_mm256_castsi256_si128(hash));
__m256i hash_hi = _mm256_cvtepu32_epi64(_mm256_extracti128_si256(hash, 1));
__m256i local_slot =
_mm256_set1_epi64x(reinterpret_cast<const uint64_t*>(local_slots)[i]);
local_slot = _mm256_shuffle_epi8(
local_slot, _mm256_setr_epi32(0x80808000, 0x80808001, 0x80808002, 0x80808003,
0x80808004, 0x80808005, 0x80808006, 0x80808007));
local_slot = _mm256_mullo_epi32(local_slot, _mm256_set1_epi32(byte_size));
__m256i pos = _mm256_srlv_epi32(hash, _mm256_set1_epi32(bits_hash_ - log_blocks_));
pos = _mm256_mullo_epi32(pos, _mm256_set1_epi32(byte_multiplier));
pos = _mm256_add_epi32(pos, local_slot);
__m256i group_id = _mm256_i32gather_epi32(elements, pos, 1);
__m256i local_slot_lo = _mm256_shuffle_epi8(
local_slot, _mm256_setr_epi32(0x80808000, 0x80808080, 0x80808001, 0x80808080,
0x80808002, 0x80808080, 0x80808003, 0x80808080));
__m256i local_slot_hi = _mm256_shuffle_epi8(
local_slot, _mm256_setr_epi32(0x80808004, 0x80808080, 0x80808005, 0x80808080,
0x80808006, 0x80808080, 0x80808007, 0x80808080));
local_slot_lo = _mm256_mul_epu32(local_slot_lo, _mm256_set1_epi32(byte_size));
local_slot_hi = _mm256_mul_epu32(local_slot_hi, _mm256_set1_epi32(byte_size));
__m256i pos_lo = _mm256_srli_epi64(hash_lo, bits_hash_ - log_blocks_);
__m256i pos_hi = _mm256_srli_epi64(hash_hi, bits_hash_ - log_blocks_);
pos_lo = _mm256_mul_epu32(pos_lo, _mm256_set1_epi32(byte_multiplier));
pos_hi = _mm256_mul_epu32(pos_hi, _mm256_set1_epi32(byte_multiplier));
pos_lo = _mm256_add_epi64(pos_lo, local_slot_lo);
pos_hi = _mm256_add_epi64(pos_hi, local_slot_hi);
__m128i group_id_lo = _mm256_i64gather_epi32(elements, pos_lo, 1);
__m128i group_id_hi = _mm256_i64gather_epi32(elements, pos_hi, 1);
__m256i group_id = _mm256_set_m128i(group_id_hi, group_id_lo);
group_id = _mm256_and_si256(group_id, _mm256_set1_epi32(mask));
_mm256_storeu_si256(reinterpret_cast<__m256i*>(out_group_ids) + i, group_id);
}
Expand Down
29 changes: 17 additions & 12 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1983,27 +1983,33 @@ class ObjectOutputStream final : public io::OutputStream {
const void* data, int64_t nbytes, std::shared_ptr<Buffer> owned_buffer = nullptr) {
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
req.SetContentLength(nbytes);
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key_));

if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
// GH-45304: avoid setting a body stream if length is 0.
// This workaround can be removed once we require AWS SDK 1.11.489 or later.
if (nbytes != 0) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
}

ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder_));

RETURN_NOT_OK(sync_result_callback(req, upload_state_, part_number_, outcome));
} else {
// If the data isn't owned, make an immutable copy for the lifetime of the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
// (GH-45304: avoid setting a body stream if length is 0, see above)
if (nbytes != 0) {
// If the data isn't owned, make an immutable copy for the lifetime of the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}
req.SetBody(std::make_shared<StringViewStream>(owned_buffer->data(),
owned_buffer->size()));
}
req.SetBody(
std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));

{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
Expand Down Expand Up @@ -2345,7 +2351,6 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
req.SetContentType(kAwsDirectoryContentType);
req.SetBody(std::make_shared<std::stringstream>(""));
return OutcomeToStatus(
std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
"PutObject", client_lock.Move()->PutObject(req));
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ class TestS3FS : public S3TestMixin {
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(ToAwsString("bucket"));
req.SetKey(ToAwsString("emptydir/"));
req.SetBody(std::make_shared<std::stringstream>(""));
RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
// NOTE: no need to create intermediate "directories" somedir/ and
// somedir/subdir/
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/flight/test_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,13 @@ class ARROW_FLIGHT_EXPORT AsyncClientTest : public FlightTest {
std::unique_ptr<FlightServerBase> server_;
};

// DISABLED TestListenerLifetime: https://github.com/apache/arrow/issues/45120
#define ARROW_FLIGHT_TEST_ASYNC_CLIENT(FIXTURE) \
static_assert(std::is_base_of<AsyncClientTest, FIXTURE>::value, \
ARROW_STRINGIFY(FIXTURE) " must inherit from AsyncClientTest"); \
TEST_F(FIXTURE, TestGetFlightInfo) { TestGetFlightInfo(); } \
TEST_F(FIXTURE, TestGetFlightInfoFuture) { TestGetFlightInfoFuture(); } \
TEST_F(FIXTURE, TestListenerLifetime) { TestListenerLifetime(); }
TEST_F(FIXTURE, DISABLED_TestListenerLifetime) { TestListenerLifetime(); }

} // namespace flight
} // namespace arrow
16 changes: 16 additions & 0 deletions cpp/src/arrow/util/basic_decimal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ static constexpr uint64_t kInt64Mask = 0xFFFFFFFFFFFFFFFF;
static constexpr uint64_t kInt32Mask = 0xFFFFFFFF;
#endif

BasicDecimal32& BasicDecimal32::Negate() {
value_ = arrow::internal::SafeSignedNegate(value_);
return *this;
}

DecimalStatus BasicDecimal32::Divide(const BasicDecimal32& divisor,
BasicDecimal32* result,
BasicDecimal32* remainder) const {
Expand Down Expand Up @@ -152,6 +157,11 @@ BasicDecimal32::operator BasicDecimal64() const {
return BasicDecimal64(static_cast<int64_t>(value()));
}

BasicDecimal64& BasicDecimal64::Negate() {
value_ = arrow::internal::SafeSignedNegate(value_);
return *this;
}

DecimalStatus BasicDecimal64::Divide(const BasicDecimal64& divisor,
BasicDecimal64* result,
BasicDecimal64* remainder) const {
Expand Down Expand Up @@ -253,12 +263,18 @@ const BasicDecimal64& BasicDecimal64::GetHalfScaleMultiplier(int32_t scale) {
bool BasicDecimal32::FitsInPrecision(int32_t precision) const {
DCHECK_GE(precision, 0);
DCHECK_LE(precision, kMaxPrecision);
if (value_ == INT32_MIN) {
return false;
}
return Abs(*this) < DecimalTraits<BasicDecimal32>::powers_of_ten()[precision];
}

bool BasicDecimal64::FitsInPrecision(int32_t precision) const {
DCHECK_GE(precision, 0);
DCHECK_LE(precision, kMaxPrecision);
if (value_ == INT64_MIN) {
return false;
}
return Abs(*this) < DecimalTraits<BasicDecimal64>::powers_of_ten()[precision];
}

Expand Down
10 changes: 2 additions & 8 deletions cpp/src/arrow/util/basic_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,7 @@ class ARROW_EXPORT BasicDecimal32 : public SmallBasicDecimal<int32_t> {
using ValueType = int32_t;

/// \brief Negate the current value (in-place)
BasicDecimal32& Negate() {
value_ = -value_;
return *this;
}
BasicDecimal32& Negate();

/// \brief Absolute value (in-place)
BasicDecimal32& Abs() { return *this < 0 ? Negate() : *this; }
Expand Down Expand Up @@ -429,10 +426,7 @@ class ARROW_EXPORT BasicDecimal64 : public SmallBasicDecimal<int64_t> {
using ValueType = int64_t;

/// \brief Negate the current value (in-place)
BasicDecimal64& Negate() {
value_ = -value_;
return *this;
}
BasicDecimal64& Negate();

/// \brief Absolute value (in-place)
BasicDecimal64& Abs() { return *this < 0 ? Negate() : *this; }
Expand Down
Loading
Loading