diff --git a/README.md b/README.md index eca58914..ffd86506 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target | database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | | sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | +| stream_name_splits | True | False | When stream names have dashes in them then use the format of [schema]-[table]. If you do not want schemas to be determined from the stream name then Disable this feature. | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 4c74694c..36c911b1 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -54,17 +54,17 @@ def get_sqlalchemy_url(self, config: dict) -> str: def truncate_table(self, name): """Clear table data.""" - self.connection.execute(f"TRUNCATE TABLE {name}") + self.connection.execute(f'TRUNCATE TABLE "{name}"') def drop_table(self, name): """Drop table data.""" - self.connection.execute(f"DROP TABLE {name}") + self.connection.execute(f'DROP TABLE "{name}"') def create_temp_table_from_table(self, from_table_name, temp_table_name): """Temp table from another table.""" ddl = sqlalchemy.DDL( - "CREATE TEMP TABLE %(temp_table_name)s AS " - "SELECT * FROM %(from_table_name)s LIMIT 0", + 'CREATE TEMP TABLE "%(temp_table_name)s" AS ' + 'SELECT * FROM "%(from_table_name)s" LIMIT 0', {"temp_table_name": temp_table_name, "from_table_name": from_table_name}, ) self.connection.execute(ddl) @@ -166,7 +166,7 @@ def get_column_add_ddl( column = sqlalchemy.Column(column_name, column_type) return sqlalchemy.DDL( - "ALTER TABLE %(table_name)s ADD COLUMN %(column_name)s %(column_type)s", + 'ALTER TABLE "%(table_name)s" ADD COLUMN "%(column_name)s" %(column_type)s', { "table_name": table_name, "column_name": column.compile(dialect=self._engine.dialect), diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 779e8d99..92882901 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -123,11 +123,11 @@ def merge_upsert_from_table( where_condition = " and ".join([f'target."{key}" is null' for key in join_keys]) insert_sql = f""" - INSERT INTO {to_table_name} + INSERT INTO \"{to_table_name}\" SELECT temp.* - FROM {from_table_name} AS temp - LEFT JOIN {to_table_name} AS target ON {join_condition} + FROM \"{from_table_name}\" AS temp + LEFT JOIN \"{to_table_name}\" AS target ON {join_condition} WHERE {where_condition} """ self.connection.execute(insert_sql) @@ -141,9 +141,9 @@ def merge_upsert_from_table( ) where_condition = join_condition update_sql = f""" - UPDATE {to_table_name} AS target + UPDATE \"{to_table_name}\" AS target SET {columns} - FROM {from_table_name} AS temp + FROM \"{from_table_name}\" AS temp WHERE {where_condition} """ self.connection.execute(update_sql) @@ -223,7 +223,7 @@ def generate_insert_statement( def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" - return name + return name.replace("-", "_") @property def schema_name(self) -> Optional[str]: @@ -246,7 +246,7 @@ def schema_name(self) -> Optional[str]: if default_target_schema: return default_target_schema - if len(parts) in {2, 3}: + if self.config["stream_name_splits"] is True and len(parts) in {2, 3}: # Stream name is a two-part or three-part identifier. # Use the second-to-last part as the schema name. stream_schema = self.conform_name(parts[-2], "schema") @@ -254,3 +254,17 @@ def schema_name(self) -> Optional[str]: # Schema name not detected. return None + + @property + def table_name(self) -> str: + """Return the table name, with no schema or database part. + + Returns: + The target table name. + """ + parts = self.stream_name.split("-") + if self.config["stream_name_splits"] is True: + table = self.stream_name if len(parts) == 1 else parts[-1] + else: + table = self.stream_name + return self.conform_name(table, "table") diff --git a/target_postgres/target.py b/target_postgres/target.py index 0f2add8e..880d0826 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -116,7 +116,18 @@ def __init__( th.StringType, description="Postgres schema to send data to, example: tap-clickup", ), + th.Property( + "stream_name_splits", + th.BooleanType, + default=True, + description=( + "When stream names have dashes in them then use " + + "the format of [schema]-[table]. If you do not want schemas " + + "to be determined from the stream name then Disable this feature." + ), + ), ).to_dict() + default_sink_class = PostgresSink @property