Skip to content

Commit

Permalink
tests: Add large-scale mysql/pg/upsert test
Browse files Browse the repository at this point in the history
and enable in release qualification pipeline (weekly)
  • Loading branch information
def- committed Jan 23, 2025
1 parent d1052e4 commit 3215fbd
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 5 deletions.
36 changes: 36 additions & 0 deletions ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,39 @@ steps:
args: [--find-limit]
timeout_in_minutes: 3600
parallelism: 20

- group: Large Scale Ingestions
key: large-scale-ingestions
steps:
- id: mysql-cdc-large-scale
label: MySQL CDC large scale ingestion
depends_on: build-aarch64
timeout_in_minutes: 3600
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc
run: large-scale
agents:
queue: hetzner-x86-64-dedi-48cpu-192gb # 1 TB disk

- id: pg-cdc-large-scale
label: Postgres CDC large scale ingestion
depends_on: build-aarch64
timeout_in_minutes: 3600
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc
run: large-scale
agents:
queue: hetzner-x86-64-dedi-48cpu-192gb # 1 TB disk

- id: upsert-large-scale
label: Upsert large scale ingestion
depends_on: build-aarch64
timeout_in_minutes: 3600
plugins:
- ./ci/plugins/mzcompose:
composition: upsert
run: large-scale
agents:
queue: hetzner-x86-64-dedi-48cpu-192gb # 1 TB disk
85 changes: 84 additions & 1 deletion test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
)
for name in sharded_workflows:
if name == "default":
if name in ("default", "large-scale"):
continue

with c.test_case(name):
Expand Down Expand Up @@ -273,3 +273,86 @@ def do_inserts(c: Composition):
"""
),
)


def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
"""
The goal is to test a large scale MySQL instance and to make sure that we can successfully ingest data from it quickly.
"""
mysql_version = get_targeted_mysql_version(parser)
with c.override(create_mysql(mysql_version)):
c.up("materialized", "mysql")
c.up("testdrive", persistent=True)

# Set up the MySQL server with the initial records, set up the connection to
# the MySQL server in Materialize.
c.testdrive(
dedent(
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
ALTER SYSTEM SET max_mysql_connections = 100
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
> CREATE SECRET IF NOT EXISTS mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (HOST mysql, USER root, PASSWORD SECRET mysqlpass)
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
DROP TABLE IF EXISTS products;
CREATE TABLE products (id int NOT NULL, name varchar(255) DEFAULT NULL, merchant_id int NOT NULL, price int DEFAULT NULL, status int DEFAULT NULL, created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP(), recordSizePayload longtext, PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
ALTER TABLE products DISABLE KEYS;
> DROP SOURCE IF EXISTS s1 CASCADE;
"""
)
)

def make_inserts(c: Composition, start: int, batch_num: int):
c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
$ mysql-execute name=mysql
SET foreign_key_checks = 0;
USE public;
SET @i:={start};
INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT @i:=@i+1, CONCAT("name", @i), @i % 1000, @i % 1000, @i % 10, '2024-12-12', repeat('x', 1000000) FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {batch_num};
"""
),
)

num_rows = 300_000 # out of disk with 400_000 rows
batch_size = 100
for i in range(0, num_rows, batch_size):
batch_num = min(batch_size, num_rows - i)
make_inserts(c, i, batch_num)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> CREATE SOURCE s1
FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE products FROM SOURCE s1 (REFERENCE public.products);
> CREATE DEFAULT INDEX ON products;
> SELECT COUNT(*) FROM products;
{num_rows}
"""
),
)

make_inserts(c, num_rows, 1)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> SELECT COUNT(*) FROM products;
{num_rows + 1}
"""
),
)
85 changes: 84 additions & 1 deletion test/pg-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import glob
import time
from textwrap import dedent

import psycopg
from psycopg import Connection
Expand Down Expand Up @@ -320,6 +321,88 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
)


def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
"""
The goal is to test a large scale Postgres instance and to make sure that we can successfully ingest data from it quickly.
"""
pg_version = get_targeted_pg_version(parser)
with c.override(
create_postgres(
pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
)
):
c.up("materialized", "postgres")
c.up("testdrive", persistent=True)

# Set up the Postgres server with the initial records, set up the connection to
# the Postgres server in Materialize.
c.testdrive(
dedent(
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
DROP PUBLICATION IF EXISTS mz_source;
CREATE SCHEMA public;
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
> CREATE CONNECTION IF NOT EXISTS pg TO POSTGRES (HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass)
$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE IF EXISTS products;
CREATE TABLE products (id int NOT NULL, name varchar(255) DEFAULT NULL, merchant_id int NOT NULL, price int DEFAULT NULL, status int DEFAULT NULL, created_at timestamp NULL, recordSizePayload text, PRIMARY KEY (id));
ALTER TABLE products REPLICA IDENTITY FULL;
CREATE PUBLICATION mz_source FOR ALL TABLES;
> DROP SOURCE IF EXISTS s1 CASCADE;
"""
)
)

def make_inserts(c: Composition, start: int, batch_num: int):
c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT {start} + row_number() OVER (), 'name' || ({start} + row_number() OVER ()), ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 10, '2024-12-12'::DATE, repeat('x', 1000000) FROM generate_series(1, {batch_num});
"""
),
)

num_rows = 300_000 # out of disk with 400_000 rows
batch_size = 10_000
for i in range(0, num_rows, batch_size):
batch_num = min(batch_size, num_rows - i)
make_inserts(c, i, batch_num)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> CREATE SOURCE s1
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
> CREATE TABLE products FROM SOURCE s1 (REFERENCE products);
> CREATE DEFAULT INDEX ON products;
> SELECT COUNT(*) FROM products;
{num_rows}
"""
),
)

make_inserts(c, num_rows, 1)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> SELECT COUNT(*) FROM products;
{num_rows + 1}
"""
),
)


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
workflows_with_internal_sharding = ["cdc"]
sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
Expand All @@ -330,7 +413,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
)
for name in sharded_workflows:
if name == "default":
if name in ("default", "large-scale"):
continue

# TODO: Flaky, reenable when database-issues#7611 is fixed
Expand Down
83 changes: 80 additions & 3 deletions test/upsert/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
materialized_environment_extra[0] = "MZ_PERSIST_COMPACTION_DISABLED=true"

for name in c.workflows:
if name in ["default", "load-test"]:
if name in ["default", "load-test", "large-scale"]:
continue
with c.test_case(name):
c.workflow(name)
Expand Down Expand Up @@ -535,8 +535,6 @@ def fetch_auto_spill_metric() -> int | None:
# This test is there to compare rehydration metrics with different configs.
# Can be run locally with the command ./mzcompose run load-test
def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None:
from textwrap import dedent

# Following variables can be updated to tweak how much data the kafka
# topic should be populated with and what should be the upsert state size.
pad_len = 1024
Expand Down Expand Up @@ -698,3 +696,82 @@ def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None:
)[0]
last_latency = rehydration_latency
print(f"Scenario {scenario_name} took {rehydration_latency} ms")


def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
"""
The goal is to test a large scale Kafka upsert instance and to make sure that we can successfully ingest data from it quickly.
"""
dependencies = ["materialized", "zookeeper", "kafka", "schema-registry"]
c.up(*dependencies)
with c.override(
Testdrive(no_reset=True, consistent_seed=True),
):
c.up("testdrive", persistent=True)

c.testdrive(
dedent(
"""
$ kafka-create-topic topic=topic1
> CREATE CONNECTION IF NOT EXISTS kafka_conn
FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT;
"""
)
)

def make_inserts(c: Composition, start: int, batch_num: int):
c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
$ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=: repeat={batch_num}
"${{kafka-ingest.iteration}}":"{'x'*1_000_000}"
"""
),
)

num_rows = 200_000 # out of disk with 300_000 rows
batch_size = 10_000
for i in range(0, num_rows, batch_size):
batch_num = min(batch_size, num_rows - i)
make_inserts(c, i, batch_num)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> CREATE SOURCE s1
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${{testdrive.seed}}');
> CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-topic1-${{testdrive.seed}}")
KEY FORMAT TEXT VALUE FORMAT TEXT
ENVELOPE UPSERT;
> CREATE DEFAULT INDEX ON s1_tbl
> SELECT COUNT(*) FROM s1_tbl;
{bach_size}
"""
),
)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
$ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=:
"{batch_size + 1}":"{'x'*1_000_000}"
"""
),
)

c.testdrive(
args=["--no-reset"],
input=dedent(
f"""
> SELECT COUNT(*) FROM s1_tbl;
{batch_size + 1}
"""
),
)

0 comments on commit 3215fbd

Please sign in to comment.