Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Name splits toggle added, target now works with upper case stream names #59

Merged
merged 2 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| stream_name_splits | True | False | When stream names have dashes in them then use the format of [schema]-[table]. If you do not want schemas to be determined from the stream name then Disable this feature.
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
10 changes: 5 additions & 5 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ def get_sqlalchemy_url(self, config: dict) -> str:

def truncate_table(self, name):
"""Clear table data."""
self.connection.execute(f"TRUNCATE TABLE {name}")
self.connection.execute(f'TRUNCATE TABLE "{name}"')

def drop_table(self, name):
"""Drop table data."""
self.connection.execute(f"DROP TABLE {name}")
self.connection.execute(f'DROP TABLE "{name}"')

def create_temp_table_from_table(self, from_table_name, temp_table_name):
"""Temp table from another table."""
ddl = sqlalchemy.DDL(
"CREATE TEMP TABLE %(temp_table_name)s AS "
"SELECT * FROM %(from_table_name)s LIMIT 0",
'CREATE TEMP TABLE "%(temp_table_name)s" AS '
'SELECT * FROM "%(from_table_name)s" LIMIT 0',
{"temp_table_name": temp_table_name, "from_table_name": from_table_name},
)
self.connection.execute(ddl)
Expand Down Expand Up @@ -166,7 +166,7 @@ def get_column_add_ddl(
column = sqlalchemy.Column(column_name, column_type)

return sqlalchemy.DDL(
"ALTER TABLE %(table_name)s ADD COLUMN %(column_name)s %(column_type)s",
'ALTER TABLE "%(table_name)s" ADD COLUMN "%(column_name)s" %(column_type)s',
{
"table_name": table_name,
"column_name": column.compile(dialect=self._engine.dialect),
Expand Down
28 changes: 21 additions & 7 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ def merge_upsert_from_table(
where_condition = " and ".join([f'target."{key}" is null' for key in join_keys])

insert_sql = f"""
INSERT INTO {to_table_name}
INSERT INTO \"{to_table_name}\"
SELECT
temp.*
FROM {from_table_name} AS temp
LEFT JOIN {to_table_name} AS target ON {join_condition}
FROM \"{from_table_name}\" AS temp
LEFT JOIN \"{to_table_name}\" AS target ON {join_condition}
WHERE {where_condition}
"""
self.connection.execute(insert_sql)
Expand All @@ -141,9 +141,9 @@ def merge_upsert_from_table(
)
where_condition = join_condition
update_sql = f"""
UPDATE {to_table_name} AS target
UPDATE \"{to_table_name}\" AS target
SET {columns}
FROM {from_table_name} AS temp
FROM \"{from_table_name}\" AS temp
WHERE {where_condition}
"""
self.connection.execute(update_sql)
Expand Down Expand Up @@ -223,7 +223,7 @@ def generate_insert_statement(

def conform_name(self, name: str, object_type: Optional[str] = None) -> str:
"""Conforming names of tables, schemas, column names."""
return name
return name.replace("-", "_")

@property
def schema_name(self) -> Optional[str]:
Expand All @@ -246,11 +246,25 @@ def schema_name(self) -> Optional[str]:
if default_target_schema:
return default_target_schema

if len(parts) in {2, 3}:
if self.config["stream_name_splits"] is True and len(parts) in {2, 3}:
# Stream name is a two-part or three-part identifier.
# Use the second-to-last part as the schema name.
stream_schema = self.conform_name(parts[-2], "schema")
return stream_schema

# Schema name not detected.
return None

@property
def table_name(self) -> str:
"""Return the table name, with no schema or database part.

Returns:
The target table name.
"""
parts = self.stream_name.split("-")
if self.config["stream_name_splits"] is True:
table = self.stream_name if len(parts) == 1 else parts[-1]
else:
table = self.stream_name
return self.conform_name(table, "table")
11 changes: 11 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,18 @@ def __init__(
th.StringType,
description="Postgres schema to send data to, example: tap-clickup",
),
th.Property(
"stream_name_splits",
th.BooleanType,
default=True,
description=(
"When stream names have dashes in them then use "
+ "the format of [schema]-[table]. If you do not want schemas "
+ "to be determined from the stream name then Disable this feature."
),
),
).to_dict()

default_sink_class = PostgresSink

@property
Expand Down