Skip to content

Commit

Permalink
added option declaration and using them intead of envs
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Hanke committed Jan 27, 2025
1 parent 710fbb1 commit d84d2aa
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 326 deletions.
32 changes: 16 additions & 16 deletions ckanext/csvwmapandtransform/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,39 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)


from ckan.lib.jobs import DEFAULT_QUEUE_NAME
from ckan import model
from typing import Any
import datetime
import itertools
import json

from ckanext.csvwmapandtransform import mapper, db
from ckanext.csvwmapandtransform.tasks import transform
import os
from typing import Any

import ckanapi
import itertools
import datetime
from dateutil.parser import parse as parse_date
from dateutil.parser import isoparse as parse_iso_date
import sqlalchemy as sa
import os
from ckan import model
from ckan.lib.jobs import DEFAULT_QUEUE_NAME
from dateutil.parser import isoparse as parse_iso_date
from dateutil.parser import parse as parse_date

from ckanext.csvwmapandtransform import db, mapper
from ckanext.csvwmapandtransform.tasks import transform

log = __import__("logging").getLogger(__name__)
# must be lower case alphanumeric and these symbols: -_
MAPPING_GROUP = "mappings"
METHOD_GROUP = "methods"
JOB_TIMEOUT = 180

CSVWMAPANDTRANSFORM_TOKEN = os.environ.get("CSVWMAPANDTRANSFORM_TOKEN", "")

def find_first_matching_id(dicts: list, key: str, value: str):
return next((d["id"] for d in dicts if d.get(key) == value), None)


def csvwmapandtransform_find_mappings(context: Context, data_dict):
mapping_group_id = find_first_matching_id(
toolkit.get_action("group_list")({}, {"all_fields": True}),
key="name",
value=MAPPING_GROUP,
)
toolkit.get_action("group_list")({}, {"all_fields": True}),
key="name",
value=MAPPING_GROUP,
)
if mapping_group_id:
mapping_group = toolkit.get_action("group_show")(
{"ignore_auth": True}, {"id": mapping_group_id, "include_datasets": True}
Expand Down
3 changes: 2 additions & 1 deletion ckanext/csvwmapandtransform/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ckanext.datastore.logic.auth as auth
from ckan.logic.auth.get import task_status_show

import ckanext.datastore.logic.auth as auth


def csvwmapandtransform_transform(context, data_dict):
if "resource" in data_dict and data_dict["resource"].get("package_id"):
Expand Down
210 changes: 72 additions & 138 deletions ckanext/csvwmapandtransform/db.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
"""
Abstracts a database. Used for storing logging when it csvwmapandtransform a resource into
Abstracts a database. Used for storing logging when it aiembeddings a resource into
DataStore.
Loosely based on ckan-service-provider's db.py
"""

"""
Abstracts a database. Used for storing logging when it aiembeddings a resource into
DataStore.
Loosely based on ckan-service-provider's db.py
"""

import datetime
import json
import os

import six
import sqlalchemy

from ckan.plugins import toolkit

ENGINE = None
_METADATA = None
JOBS_TABLE = None
METADATA_TABLE = None
LOGS_TABLE = None

DB_URL = os.environ.get("CSVWMAPANDTRANSFORM_SQLALCHEMY_URL", "sqlite:////tmp/jobs.db")


def init(db_uri=DB_URL, echo=False):
def init(db_uri: str = "", echo=False):
"""Initialise the database.
Initialise the sqlalchemy engine, metadata and table objects that we use to
Expand All @@ -38,6 +43,8 @@ def init(db_uri=DB_URL, echo=False):
:type echo: bool
"""
if not db_uri:
db_uri = toolkit.config.get("ckanext.csvwmapandtransform.db_url")
global ENGINE, _METADATA, JOBS_TABLE, METADATA_TABLE, LOGS_TABLE
ENGINE = sqlalchemy.create_engine(db_uri, echo=echo, convert_unicode=True)
_METADATA = sqlalchemy.MetaData(ENGINE)
Expand All @@ -59,50 +66,35 @@ def drop_all():
_METADATA.drop_all(ENGINE)


def get_job(job_id):
"""Return the job with the given job_id as a dict.
The dict also includes any metadata or logs associated with the job.
Returns None instead of a dict if there's no job with the given job_id.
The keys of a job dict are:
"job_id": The unique identifier for the job (unicode)
def delete_job(job_id):
"""Delete a job from the jobs table by job_id.
"job_type": The name of the job function that will be executed for this
job (unicode)
"status": The current status of the job, e.g. "pending", "running",
"running_but_viewable", complete", or "error" (unicode)
"data": Any output data returned by the job if it has completed
successfully. This may be any JSON-serializable type, e.g. None, a
string, a dict, etc.
"error": If the job failed with an error this will be a dict with a
"message" key whose value is a string error message. The dict may also
have other keys specific to the particular type of error. If the job
did not fail with an error then "error" will be None.
"requested_timestamp": The time at which the job was requested (string)
"finished_timestamp": The time at which the job finished (string)
"sent_data": The input data for the job, provided by the client site.
This may be any JSON-serializable type, e.g. None, a string, a dict,
etc.
"result_url": The callback URL that CKAN Service Provider will post the
result to when the job finishes (unicode)
:param job_id: the job_id of the job to be deleted
:type job_id: unicode
"""
if job_id:
job_id = six.text_type(job_id)

"metadata": Any custom metadata associated with the job (dict)
msg = ""
with ENGINE.connect() as conn:
trans = conn.begin()
try:
result = conn.execute(
JOBS_TABLE.delete().where(JOBS_TABLE.c.job_id == job_id)
)
if result.rowcount == 0:
msg = f"No job found with id: {job_id}"
else:
msg = f"Job with id: {job_id} has been deleted successfully."
trans.commit()
except Exception as e:
trans.rollback()
msg = f"An error occurred: {e}"
return msg

"logs": Any logs associated with the job (list)

"""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
def get_job(job_id):
"""Return the job with the given job_id as a dict."""
if job_id:
job_id = six.text_type(job_id)

Expand All @@ -114,18 +106,14 @@ def get_job(job_id):
if not result:
return None

# Turn the result into a dictionary representation of the job.
result_dict = {}
for field in list(result.keys()):
value = getattr(result, field)
if value is None:
result_dict[field] = value
elif field in ("sent_data", "data", "error"):
result_dict[field] = json.loads(value)
elif isinstance(value, datetime.datetime):
result_dict[field] = value.isoformat()
else:
result_dict[field] = six.text_type(value)
result_dict = {
field: (
value.isoformat()
if isinstance(value := getattr(result, field), datetime.datetime)
else value
)
for field in result.keys()
}

result_dict["metadata"] = _get_metadata(job_id)
result_dict["logs"] = _get_logs(job_id)
Expand All @@ -134,49 +122,17 @@ def get_job(job_id):


def add_pending_job(job_id, job_type, data=None, metadata=None, result_url=None):
"""Add a new job with status "pending" to the jobs table.
All code that adds jobs to the jobs table should go through this function.
Code that adds to the jobs table manually should be refactored to use this
function.
May raise unspecified exceptions from Python core, SQLAlchemy or JSON!
TODO: Document and unit test these!
:param job_id: a unique identifier for the job, used as the primary key in
ckanserviceprovider's "jobs" database table
:type job_id: unicode
:param job_type: the name of the job function that will be executed for
this job
:type job_type: unicode
:param data: The input data for the job (called sent_data elsewhere)
:type data: Any JSON-serializable type
:param metadata: A dict of arbitrary (key, value) metadata pairs to be
stored along with the job. The keys should be strings, the values can
be strings or any JSON-encodable type.
:type metadata: dict
:param result_url: the callback URL that ckanserviceprovider will post the
job result to when the job has finished
:type result_url: unicode
"""
"""Add a new job with status "pending" to the jobs table."""
if not data:
data = {}
data = json.dumps(data)
data = six.text_type(json.dumps(data))

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
if job_id:
job_id = six.text_type(job_id)
if job_type:
job_type = six.text_type(job_type)
if result_url:
result_url = six.text_type(result_url)
data = six.text_type(data)

if not metadata:
metadata = {}
Expand All @@ -195,23 +151,22 @@ def add_pending_job(job_id, job_type, data=None, metadata=None, result_url=None)
)
)

# Insert any (key, value) metadata pairs that the job has into the
# metadata table.
inserts = []
for key, value in list(metadata.items()):
type_ = "string"
if not isinstance(value, six.string_types):
value = json.dumps(value)
type_ = "json"

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
key = six.text_type(key)
value = six.text_type(value)

inserts.append(
{"job_id": job_id, "key": key, "value": value, "type": type_}
)
inserts = [
{
"job_id": job_id,
"key": six.text_type(key),
"value": six.text_type(
json.dumps(value)
if not isinstance(value, six.string_types)
else value
),
"type": (
"json" if not isinstance(value, six.string_types) else "string"
),
}
for key, value in metadata.items()
]

if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
Expand Down Expand Up @@ -265,31 +220,14 @@ def _validate_error(error):


def _update_job(job_id, job_dict):
"""Update the database row for the given job_id with the given job_dict.
All functions that update rows in the jobs table do it by calling this
helper function.
job_dict is a dict with values corresponding to the database columns that
should be updated, e.g.:
{"status": "complete", "data": ...}
"""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
"""Update the database row for the given job_id with the given job_dict."""
if job_id:
job_id = six.text_type(job_id)

if "error" in job_dict:
job_dict["error"] = _validate_error(job_dict["error"])
job_dict["error"] = json.dumps(job_dict["error"])
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_dict["error"] = json.dumps(_validate_error(job_dict["error"]))
job_dict["error"] = six.text_type(job_dict["error"])

# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if "data" in job_dict:
job_dict["data"] = six.text_type(job_dict["data"])

Expand Down Expand Up @@ -428,28 +366,24 @@ def _init_logs_table():

def _get_metadata(job_id):
"""Return any metadata for the given job_id from the metadata table."""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = six.text_type(job_id)

with ENGINE.connect() as conn:
results = conn.execute(
METADATA_TABLE.select().where(METADATA_TABLE.c.job_id == job_id)
).fetchall()

metadata = {}
for row in results:
value = row["value"]
if row["type"] == "json":
value = json.loads(value)
metadata[row["key"]] = value
metadata = {
row["key"]: json.loads(row["value"]) if row["type"] == "json" else row["value"]
for row in results
}
return metadata


def _get_logs(job_id):
"""Return any logs for the given job_id from the logs table."""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = six.text_type(job_id)

with ENGINE.connect() as conn:
results = conn.execute(
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)
Expand Down
Loading

0 comments on commit d84d2aa

Please sign in to comment.