Skip to content

Commit

Permalink
Add configuration for duplicate map keys
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Feb 19, 2025
1 parent 000939f commit 11cb0ba
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 0 deletions.
10 changes: 10 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ class QueryConfig {
static constexpr const char* kSparkLegacyDateFormatter =
"spark.legacy_date_formatter";

/// If a key is found in multiple given maps, by default that key's value in
/// the resulting map comes from the last one of those maps. When true, throw
/// exception on duplicate map key.
static constexpr const char* kSparkThrowExceptionOnDuplicateMapKeys =
"spark.throw_exception_on_duplicate_map_keys";

/// The number of local parallel table writer operators per task.
static constexpr const char* kTaskWriterCount = "task_writer_count";

Expand Down Expand Up @@ -831,6 +837,10 @@ class QueryConfig {
return get<bool>(kSparkLegacyDateFormatter, false);
}

bool sparkThrowExceptionOnDuplicateMapKeys() const {
return get<bool>(kSparkThrowExceptionOnDuplicateMapKeys, false);
}

bool exprTrackCpuUsage() const {
return get<bool>(kExprTrackCpuUsage, false);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,11 @@ Spark-specific Configuration
Joda date formatter performs strict checking of its input and uses different pattern string.
For example, the 2015-07-22 10:00:00 timestamp cannot be parsed if pattern is yyyy-MM-dd because the parser does not consume whole input.
Another example is that the 'W' pattern, which means week in month, is not supported. For more differences, see :issue:`10354`.
* - spark.throw_exception_on_duplicate_map_keys
- bool
- false
- By default, if a key is found in multiple given maps, that key's value in the resulting map comes from the last one of those maps.
If true, throws exception when duplicate keys are found. This configuration is needed by Spark functions `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.

Tracing
--------
Expand Down
11 changes: 11 additions & 0 deletions velox/docs/functions/spark/map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ Map Functions

SELECT map(array(1, 2), array(3, 4)); -- {[1, 2] -> [3, 4]}

.. spark:function:: map_concat(map1(K,V), map2(K,V), ..., mapN(K,V)) -> map(K,V)
Returns the union of all the given maps. If a key is found in multiple given maps,
by default that key's value in the resulting map comes from the last one of those maps.
If configuration `spark.throw_exception_on_duplicate_map_keys` is set true, throws exception
for duplicate keys. ::

SELECT map_concat(map(1, 'a', 2, 'b'), map(3, 'c')); -- {1 -> 'a', 2 -> 'b', 3 -> 'c'}
SELECT map_concat(map(1, 'a', 2, 'b'), map(3, 'c', 2, 'd')); -- {1 -> 'a', 2 -> 'd', 3 -> 'c'} (LAST_WIN behavior)
SELECT map_concat(map(1, 'a', 2, 'b'), map(3, 'c', 2, 'd')); -- "Duplicate map key 2 was found" (EXCEPTION behavior)

.. spark:function:: map_entries(map(K,V)) -> array(row(K,V))
Returns an array of all entries in the given map. ::
Expand Down
10 changes: 10 additions & 0 deletions velox/functions/lib/MapConcat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class MapConcatFunction : public exec::VectorFunction {
// Check for duplicate keys
SelectivityVector uniqueKeys(offset);
vector_size_t duplicateCnt = 0;
const auto throwExceptionOnDuplicateMapKeys =
context.execCtx()
->queryCtx()
->queryConfig()
.sparkThrowExceptionOnDuplicateMapKeys();
rows.applyToSelected([&](vector_size_t row) {
const int mapOffset = rawOffsets[row];
const int mapSize = rawSizes[row];
Expand All @@ -118,6 +123,11 @@ class MapConcatFunction : public exec::VectorFunction {
for (vector_size_t i = 1; i < mapSize; i++) {
if (combinedKeys->equalValueAt(
combinedKeys.get(), mapOffset + i, mapOffset + i - 1)) {
if (throwExceptionOnDuplicateMapKeys) {
const auto duplicateKey = combinedKeys->wrappedVector()->toString(
combinedKeys->wrappedIndex(mapOffset + i));
VELOX_USER_FAIL("Duplicate map key {} was found.", duplicateKey);
}
duplicateCnt++;
// "remove" duplicate entry
uniqueKeys.setValid(mapOffset + i - 1, false);
Expand Down
11 changes: 11 additions & 0 deletions velox/functions/lib/tests/MapConcatTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/functions/lib/MapConcat.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h"
#include "velox/parse/TypeResolver.h"

Expand Down Expand Up @@ -193,6 +194,8 @@ TEST_F(MapConcatTest, duplicateKeys) {
auto aMap = makeMapVector(size, a);
auto bMap = makeMapVector(size, b);

// By default, if a key is found in multiple given maps, that key's value in
// the resulting map comes from the last one of those maps.
std::map<std::string, int32_t> ab = concat(a, b);
auto expectedMap = makeMapVector(size, ab);

Expand Down Expand Up @@ -225,6 +228,14 @@ TEST_F(MapConcatTest, duplicateKeys) {
<< "at " << i << ": expected " << expectedMap->toString(i) << ", got "
<< result->toString(i);
}

// Throws exception when duplicate keys are found.
queryCtx_->testingOverrideConfigUnsafe({
{core::QueryConfig::kSparkThrowExceptionOnDuplicateMapKeys, "true"},
});
VELOX_ASSERT_THROW(
evaluate<MapVector>("map_concat(c0, c1)", makeRowVector({aMap, bMap})),
"Duplicate map key a2 was found");
}

TEST_F(MapConcatTest, partiallyPopulated) {
Expand Down
3 changes: 3 additions & 0 deletions velox/functions/sparksql/registration/RegisterMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/functions/lib/MapConcat.h"
#include "velox/functions/lib/RegistrationHelpers.h"
#include "velox/functions/sparksql/Size.h"

Expand All @@ -31,6 +32,8 @@ void registerSparkMapFunctions(const std::string& prefix) {
VELOX_REGISTER_VECTOR_FUNCTION(udf_map_keys, prefix + "map_keys");
VELOX_REGISTER_VECTOR_FUNCTION(udf_map_values, prefix + "map_values");
VELOX_REGISTER_VECTOR_FUNCTION(udf_map_zip_with, prefix + "map_zip_with");

registerMapConcatFunction(prefix + "map_concat");
}

namespace sparksql {
Expand Down

0 comments on commit 11cb0ba

Please sign in to comment.