Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform PSQL2BigQuery into a CLI #9

Merged
merged 18 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ setup:

dependencies:
@make setup
@poetry install --no-root
@poetry install --no-root --extras sentry
thiagoferreiraw marked this conversation as resolved.
Show resolved Hide resolved

update:
@poetry update
Expand All @@ -21,12 +21,8 @@ lint:
@poetry run pylint psql2bigquery
@poetry run black --check .

unit:
@echo "Running unit tests ..."
@poetry run nosetests

clean:
@rm -rf .coverage coverage.xml dist/ build/ *.egg-info/
@rm -rf dist/ build/ *.egg-info/

publish:
@make clean
Expand Down
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
psql2bigquery
# PostgreSQL to BigQuery

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really good add a readme to project =D
What do you think about add a how-to run local to help developers contribute to the project

Copy link
Contributor Author

@joaodaher joaodaher Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added a new commit with a Contributing section.
Please tell me if there's anything I could add.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @joaodaher!

I'd be great to have more usage instructions and examples with the available params. Can you add that as well please?

It could also be in a different issue/PR, no problem. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note that could be added to the instructions: --include and --exclude are mutually exclusive, so they can't be used together.

And here's the syntax for multiple tables (worth mentioning in the readme or command help):

poetry run psql2bigquery run --db-host localhost \
--db-port 5430 \
--db-user whatsgood \
--db-password xx \
--db-name whatsgood \
--gcp-project whatsgood-dev \
--gcp-dataset api \
--include membership_membership \
--include membership_membershipprogram \
--gcp-credential-path /Users/tferreira/project/psql2bigquery/whatsgood-dev.json

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a basic usage example for now.
We can improve later with a proper documentation of each parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @joaodaher, thanks!


Install with: `pip install psql2bigquery`

Get usage instructions with: `psql2bigquery run --help`

## Sample usage

```
poetry run psql2bigquery run \
--db-host localhost \
--db-port 5432 \
--db-user username \
--db-password secret-password \
--db-name my_api \
--gcp-project my-project \
--gcp-dataset my_api \
--include table_name_a \
--include table_name_b \
--gcp-credential-path /path/to/credential.json
```

## Logging

There's a possibility to use Sentry.io for error logging.

Just set the environment variable `SENTRY_DSN` and psql2bigquery will automatically configure the logger.


## Contributing

- Fork this project
- Install dependencies with `make dependencies`
- Make sure you have Python 3 installed. (pyenv)[https://github.com/pyenv/pyenv#installation] is highly recommended
- You can test the client locally (without installing the package) with `poetry run psql2bigquery <command>`
- Make a PR with as much details as possible
352 changes: 225 additions & 127 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions psql2bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from psql2bigquery import run
192 changes: 11 additions & 181 deletions psql2bigquery/main.py
Original file line number Diff line number Diff line change
@@ -1,188 +1,18 @@
# Requires the following environment variables
# DB_HOST : PostgreSQL IP address
# DB_USER : PostgreSQL username
# DB_PASSWORD : PostgreSQL username password
# DB_PORT : PostgreSQL database port (defaults to 5432)
# DB_NAME : PostgreSQL database name
# SCHEMA : PostgreSQL database schema (defaults to public)
# GCP_PROJECT : Google Cloud project name
# GCP_DATASET : BigQuery dataset name
# GCP_CREDENTIAL_PATH : path to Google Cloud credentials (defaults to ./credentials.json)
import click

import os
from pathlib import Path
from psql2bigquery.tools import logging

import logging
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
from google.api_core.exceptions import BadRequest
from google.cloud import bigquery
import psycopg2
from google.oauth2 import service_account

# Setup Sentry Logging
SENTRY_DSN = os.environ.get("SENTRY_DSN")
if SENTRY_DSN:
sentry_logging = LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR,
)
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[sentry_logging]
)
logging.setup_logging()

# Setup Variables
IGNORE_TABLES = (
"order_historicalmasterorder",
"order_historicalorder",
"order_historicalorderitem",
"notification_inappnotification",
"watson_searchentry",
"spatial_ref_sys",
)
IGNORE_PREFIXES = (
"django_",
"square_",
"pg_",
)

SCHEMA: str = "public"
GCP_PROJECT = os.environ["GCP_PROJECT"]
GCP_DATASET = os.environ.get("GCP_DATASET", "api")
GCP_CREDENTIAL_PATH = os.environ.get("GCP_CREDENTIAL_PATH", "./credentials.json")
@click.group()
@click.version_option()
def cli():
"""Welcome to PSQL to BigQuery CLI

DELIMITER = "~"
QUOTE = '"'
Command Line Interface for moving PostgreSQL data to BigQuery


class PSQL:
_conn = None

@classmethod
def _connection(cls):
if not cls._conn:
cls._conn = psycopg2.connect(
host=os.environ["DB_HOST"],
database=os.environ["DB_NAME"],
user=os.environ["DB_USER"],
password=os.environ["DB_PASSWORD"],
port=os.environ.get("DB_PORT", "5432"),
)
return cls._conn

@classmethod
def _execute_query(cls, sql: str):
cur = cls._connection().cursor()
cur.execute(sql)
cls._conn.commit()
output = cur.fetchall()
return output

@classmethod
def dump_table(cls, table_name: str):
cur = cls._connection().cursor()

file = Path(__file__).parent / "dump" / f"{table_name}.csv"
file.parent.mkdir(exist_ok=True, parents=True)

with file.open("w") as f:
sql = (
f"COPY (SELECT * FROM {table_name}) TO STDOUT "
f"WITH (FORMAT CSV, HEADER TRUE, DELIMITER '{DELIMITER}', QUOTE '{QUOTE}', FORCE_QUOTE *);"
)
cur.copy_expert(sql=sql, file=f)
cls._conn.commit()

return file

@classmethod
def list_tables(cls):
def _is_accepted(name):
return name.lower() in IGNORE_TABLES or any(name.startswith(prefix) for prefix in IGNORE_PREFIXES)

def _fetch_names(query):
for row in cls._execute_query(sql=query):
name = row[0]
if not _is_accepted(name=name):
logging.debug(f"Ignoring: {name}")
continue
yield name

sql = f"SELECT tablename FROM pg_tables WHERE schemaname='{SCHEMA}'"
yield from _fetch_names(query=sql)

sql = f"select table_name from INFORMATION_SCHEMA.views WHERE table_schema='{SCHEMA}'"
yield from _fetch_names(query=sql)

@classmethod
def close(cls):
cls._connection().close()


class Parser:
@classmethod
def parse_output_as_rows(cls, output):
return output


class BigQuery:
_client = None

@classmethod
def client(cls) -> bigquery.Client:
if not cls._client:
credentials = service_account.Credentials.from_service_account_file(GCP_CREDENTIAL_PATH)
cls._client = bigquery.Client(
credentials=credentials,
)
return cls._client

@classmethod
def load_to_bigquery(cls, table_name: str, file_path):
table_id = f"{GCP_PROJECT}.{GCP_DATASET}.{table_name}"

job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
field_delimiter=DELIMITER,
quote_character=QUOTE,
allow_quoted_newlines=True,
create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
max_bad_records=10,
)

load_job = cls.client().load_table_from_file(
file_obj=file_path.open("rb"),
destination=table_id,
job_config=job_config,
)

load_job.result()

return table_id

@classmethod
def check_table(cls, table_id: str):
table = cls.client().get_table(table_id)
return table.num_rows


def main():
for table_name in PSQL.list_tables():
logging.info(f"[{table_name}]")
file_path = PSQL.dump_table(table_name=table_name)
try:
table_id = BigQuery.load_to_bigquery(table_name=table_name, file_path=file_path)
logging.info(f"\tPSQL ---> BIGQUERY [{table_name}]: {BigQuery.check_table(table_id=table_id)} rows")
os.remove(file_path)
except BadRequest as exc:
logging.error(f"\tPSQL ---> BIGQUERY [{table_name}]: {exc.errors}")

PSQL.close()


if __name__ == "__main__":
main()
You can start with:
psql2bigquery run --help
"""
Loading