Skip to content

Commit

Permalink
Use parameter binding in TDEngine target insertions (#536)
Browse files Browse the repository at this point in the history
* Small changes

* Set pytest marker at the module level

https://docs.pytest.org/en/stable/example/markers.html#marking-whole-classes-or-modules

* Add type hints

* Upgrade pytest

* Use parameter binding in TDEngine INSERT

* Upgrade pytest-benchmark

* Fix `pytest.skip` usage

https://docs.pytest.org/en/stable/how-to/skipping.html

* Revert "Fix `pytest.skip` usage"

This reverts commit b6cbb80.

* Revert "Upgrade pytest-benchmark"

This reverts commit c7894bb.

* Revert "Upgrade pytest"

This reverts commit 5576ab8.

* Suppress type hint for old pytest

* Remove a redundant parameter

* Move error class to dtypes

* Rename fun -> func

* val_names -> regular_column_names

* Add `_TDEngineField` named tuple

* Add `_to_tag` and `_to_column` mappings

* Rename `_TDEngineField` -> `_TDEngineFieldData`

* Get TDEngine schema from table/super-table

* format

* Improve type hints

* empty

* Call `DESCRIBE` once in the `_init` method

* Rename "field data" to "field"

* Check first instead of try-except

* Validate DB and table names

* Improve ms comment

* Rewrite the test into `test_get_table_schema`
  • Loading branch information
jond01 authored Sep 17, 2024
1 parent 43e8f4b commit dac6f3d
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 60 deletions.
80 changes: 63 additions & 17 deletions integration/test_tdengine.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import os
from datetime import datetime
from collections.abc import Iterator
from datetime import datetime, timezone
from typing import Optional

import pytest
import pytz
import taosws

from storey import SyncEmitSource, build_flow
from storey.targets import TDEngineTarget

url = os.getenv("TDENGINE_URL")
url = os.getenv("TDENGINE_URL") # e.g.: taosws://root:taosdata@localhost:6041
user = os.getenv("TDENGINE_USER")
password = os.getenv("TDENGINE_PASSWORD")
has_tdengine_credentials = all([url, user, password]) or (url and url.startswith("taosws"))
has_tdengine_credentials = all([url, user, password]) or (url and url.startswith("taosws://"))

pytestmark = pytest.mark.skipif(not has_tdengine_credentials, reason="Missing TDEngine URL, user, and/or password")

@pytest.fixture()
def tdengine():
TDEngineData = tuple[taosws.Connection, str, Optional[str], Optional[str], str, str]


@pytest.fixture(params=[10])
def tdengine(request: "pytest.FixtureRequest") -> Iterator[TDEngineData]:
db_name = "storey"
supertable_name = "test_supertable"

if url.startswith("taosws"):
if url.startswith("taosws://"):
connection = taosws.connect(url)
else:

connection = taosws.connect(
url=url,
user=user,
password=password,
)
connection = taosws.connect(url=url, user=user, password=password)

try:
connection.execute(f"DROP DATABASE {db_name};")
Expand All @@ -44,7 +44,9 @@ def tdengine():
if "STable not exist" not in str(err):
raise err

connection.execute(f"CREATE STABLE {supertable_name} (time TIMESTAMP, my_string NCHAR(10)) TAGS (my_int INT);")
connection.execute(
f"CREATE STABLE {supertable_name} (time TIMESTAMP, my_string NCHAR({request.param})) TAGS (my_int INT);"
)

# Test runs
yield connection, url, user, password, db_name, supertable_name
Expand All @@ -55,8 +57,7 @@ def tdengine():


@pytest.mark.parametrize("table_col", [None, "$key", "table"])
@pytest.mark.skipif(not has_tdengine_credentials, reason="Missing TDEngine URL, user, and/or password")
def test_tdengine_target(tdengine, table_col):
def test_tdengine_target(tdengine: TDEngineData, table_col: Optional[str]) -> None:
connection, url, user, password, db_name, supertable_name = tdengine
time_format = "%d/%m/%y %H:%M:%S UTC%z"

Expand Down Expand Up @@ -116,7 +117,7 @@ def test_tdengine_target(tdengine, table_col):
if typ == "TIMESTAMP":
t = datetime.fromisoformat(row[field_index])
# websocket returns a timestamp with the local time zone
t = t.astimezone(pytz.UTC).replace(tzinfo=None)
t = t.astimezone(timezone.utc).replace(tzinfo=None)
row[field_index] = t
result_list.append(row)
if table_col:
Expand All @@ -133,3 +134,48 @@ def test_tdengine_target(tdengine, table_col):
[datetime(2019, 9, 18, 1, 55, 14), "hello4", 4],
]
assert result_list == expected_result


@pytest.mark.parametrize("tdengine", [100], indirect=["tdengine"])
def test_sql_injection(tdengine: TDEngineData) -> None:
connection, url, user, password, db_name, supertable_name = tdengine
# Create another table to be dropped via SQL injection
tb_name = "dont_drop_me"
connection.execute(f"CREATE TABLE IF NOT EXISTS {tb_name} USING {supertable_name} TAGS (101);")
extra_table_query = f"SHOW TABLES LIKE '{tb_name}';"
assert list(connection.query(extra_table_query)), "The extra table was not created"

# Try dropping the table
table_name = "test_table"
table_col = "table"
controller = build_flow(
[
SyncEmitSource(),
TDEngineTarget(
url=url,
time_col="time",
columns=["my_string"],
user=user,
password=password,
database=db_name,
table_col=table_col,
supertable=supertable_name,
tag_cols=["my_int"],
time_format="%d/%m/%y %H:%M:%S UTC%z",
max_events=10,
),
]
).run()

date_time_str = "18/09/19 01:55:1"
for i in range(5):
timestamp = f"{date_time_str}{i} UTC-0000"
subtable_name = f"{table_name}{i}"
event_body = {"time": timestamp, "my_int": i, "my_string": f"s); DROP TABLE {tb_name};"}
event_body[table_col] = subtable_name
controller.emit(event_body)

controller.terminate()
controller.await_termination()

assert list(connection.query(extra_table_query)), "The extra table was dropped"
23 changes: 21 additions & 2 deletions storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from datetime import datetime, timezone
from enum import Enum
from typing import Callable, List, Optional, Union
from typing import Callable, List, Literal, NamedTuple, Optional, Union

import numpy

Expand Down Expand Up @@ -103,6 +103,14 @@ class FlowError(Exception):
pass


class TDEngineTypeError(TypeError):
pass


class TDEngineValueError(ValueError):
pass


class WindowBase:
def __init__(self, window, period, window_str):
self.window_millis = window
Expand Down Expand Up @@ -446,3 +454,14 @@ def should_aggregate(self, element):
class FixedWindowType(Enum):
CurrentOpenWindow = 1
LastClosedWindow = 2


class _TDEngineField(NamedTuple):
field: str
# https://docs.tdengine.com/reference/taos-sql/data-type/
type: Literal["TIMESTAMP", "INT", "FLOAT", "DOUBLE", "BINARY", "BOOL", "NCHAR", "JSON", "VARCHAR"]
length: int
note: Literal["", "TAG"]
encode: str
compress: str
level: str
Loading

0 comments on commit dac6f3d

Please sign in to comment.