Skip to content

Commit

Permalink
fixed db connection and ajax status updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Hanke committed Jan 17, 2025
1 parent 76ae689 commit 508c8ca
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 245 deletions.
76 changes: 76 additions & 0 deletions ckanext/csvwmapandtransform/assets/script.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
ckan.module('csvwmapandtransform', function (jQuery) {
return {
options: {
parameters: {
html: {
contentType: 'application/json', // change the content type to text/html
dataType: 'json', // change the data type to html
dataConverter: function (data) { return data; },
language: 'json'
}
}
},
initialize: function () {
var self = this;
var p;
p = this.options.parameters.html;
console.log("Initialized csvwmapandtransform for element: ", this.el);
var log_length;
log_length = 0;
var update = function () { // define the update function
jQuery.ajax({
url: "/status",
type: 'GET',
contentType: p.contentType,
dataType: p.dataType,
data: { get_param: 'value' },
success: function (data) {
const haslogs = 'logs' in data.status;
const hasresults = data.status?.data?.files;
if (hasresults || haslogs) {
// console.log(self.el.find('button[name="delete"]'));
self.el.find('button[name="delete"]').removeClass("invisible");
self.el.find('div[name="status"]').removeClass("invisible");
};
// console.log(haslogs, hasresults);
if (!haslogs) return;
var length = Object.keys(data.status.logs).length;
if (length) {
if (length !== log_length) {
// self.el.html(JSON.stringify(data, null, 2)); // update the HTML if there are changes
var logs_div = $(self.el).find('ul[name="log"]');
jQuery.each(data.status.logs, function (key, value) {
if (key + 1 < log_length) return;
logs_div.append("<li class='item "
+ value.class +
"'><i class='fa icon fa-"
+ value.icon +
"'></i><div class='alert alert-"
+ value.alertlevel +
" mb-0 mt-3' role='alert'>"
+ value.message +
"</div><span class='date' title='timestamp'>"
+ value.timestamp +
"</span></li>");
console.log("Appending log:", value);
});
console.log("csvwmapandtransform: status updated");
log_length = length;
}
} else {
// console.log('no log changes');
}
},
error: function (xhr, status, error) {
console.log('Error:', error);
},
complete: function () {
// call the update function recursively after a delay
setTimeout(update, 2000);
}
});
};
update(); // call the update function immediately after initialization
}
};
});
13 changes: 13 additions & 0 deletions ckanext/csvwmapandtransform/assets/webassets.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
js:
filters: rjsmin
output: csvwmapandtransform/csvwmapandtransform.js
contents:
- script.js
extra:
preload:
- base/main

style:
contents:
- style.css
output: csvwmapandtransform/csvwmapandtransform.css
188 changes: 99 additions & 89 deletions ckanext/csvwmapandtransform/db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'''
"""
Abstracts a database. Used for storing logging when it csvwmapandtransform a resource into
DataStore.
Loosely based on ckan-service-provider's db.py
'''
"""

import datetime
import json
Expand Down Expand Up @@ -106,8 +106,10 @@ def get_job(job_id):
if job_id:
job_id = six.text_type(job_id)

result = ENGINE.execute(
JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)).first()
with ENGINE.connect() as conn:
result = conn.execute(
JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)
).first()

if not result:
return None
Expand All @@ -118,15 +120,15 @@ def get_job(job_id):
value = getattr(result, field)
if value is None:
result_dict[field] = value
elif field in ('sent_data', 'data', 'error'):
elif field in ("sent_data", "data", "error"):
result_dict[field] = json.loads(value)
elif isinstance(value, datetime.datetime):
result_dict[field] = value.isoformat()
else:
result_dict[field] = six.text_type(value)

result_dict['metadata'] = _get_metadata(job_id)
result_dict['logs'] = _get_logs(job_id)
result_dict["metadata"] = _get_metadata(job_id)
result_dict["logs"] = _get_logs(job_id)

return result_dict

Expand Down Expand Up @@ -179,45 +181,43 @@ def add_pending_job(job_id, job_type, data=None, metadata=None, result_url=None)
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
status='pending',
requested_timestamp=datetime.datetime.utcnow(),
sent_data=data,
result_url=result_url))

# Insert any (key, value) metadata pairs that the job has into the
# metadata table.
inserts = []
for key, value in list(metadata.items()):
type_ = 'string'
if not isinstance(value, six.string_types):
value = json.dumps(value)
type_ = 'json'

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
key = six.text_type(key)
value = six.text_type(value)

inserts.append(
{"job_id": job_id,
"key": key,
"value": value,
"type": type_}
with ENGINE.connect() as conn:
trans = conn.begin()
try:
conn.execute(
JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
status="pending",
requested_timestamp=datetime.datetime.utcnow(),
sent_data=data,
result_url=result_url,
)
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()

# Insert any (key, value) metadata pairs that the job has into the
# metadata table.
inserts = []
for key, value in list(metadata.items()):
type_ = "string"
if not isinstance(value, six.string_types):
value = json.dumps(value)
type_ = "json"

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
key = six.text_type(key)
value = six.text_type(value)

inserts.append(
{"job_id": job_id, "key": key, "value": value, "type": type_}
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise


class InvalidErrorObjectError(Exception):
Expand Down Expand Up @@ -257,11 +257,11 @@ def _validate_error(error):
if isinstance(message, six.string_types):
return error
else:
raise InvalidErrorObjectError(
"error['message'] must be a string")
raise InvalidErrorObjectError("error['message'] must be a string")
except (TypeError, KeyError):
raise InvalidErrorObjectError(
"error must be either a string or a dict with a message key")
"error must be either a string or a dict with a message key"
)


def _update_job(job_id, job_dict):
Expand Down Expand Up @@ -293,10 +293,10 @@ def _update_job(job_id, job_dict):
if "data" in job_dict:
job_dict["data"] = six.text_type(job_dict["data"])

ENGINE.execute(
JOBS_TABLE.update()
.where(JOBS_TABLE.c.job_id == job_id)
.values(**job_dict))
with ENGINE.connect() as conn:
conn.execute(
JOBS_TABLE.update().where(JOBS_TABLE.c.job_id == job_id).values(**job_dict)
)


def mark_job_as_completed(job_id, data=None):
Expand Down Expand Up @@ -364,56 +364,64 @@ def mark_job_as_failed_to_post_result(job_id):
"""
update_dict = {
"error":
"Process completed but unable to post to result_url",
"error": "Process completed but unable to post to result_url",
}
_update_job(job_id, update_dict)


def _init_jobs_table():
"""Initialise the "jobs" table in the db."""
_jobs_table = sqlalchemy.Table(
'jobs', _METADATA,
sqlalchemy.Column('job_id', sqlalchemy.UnicodeText, primary_key=True),
sqlalchemy.Column('job_type', sqlalchemy.UnicodeText),
sqlalchemy.Column('status', sqlalchemy.UnicodeText, index=True),
sqlalchemy.Column('data', sqlalchemy.UnicodeText),
sqlalchemy.Column('error', sqlalchemy.UnicodeText),
sqlalchemy.Column('requested_timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('finished_timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('sent_data', sqlalchemy.UnicodeText),
"jobs",
_METADATA,
sqlalchemy.Column("job_id", sqlalchemy.UnicodeText, primary_key=True),
sqlalchemy.Column("job_type", sqlalchemy.UnicodeText),
sqlalchemy.Column("status", sqlalchemy.UnicodeText, index=True),
sqlalchemy.Column("data", sqlalchemy.UnicodeText),
sqlalchemy.Column("error", sqlalchemy.UnicodeText),
sqlalchemy.Column("requested_timestamp", sqlalchemy.DateTime),
sqlalchemy.Column("finished_timestamp", sqlalchemy.DateTime),
sqlalchemy.Column("sent_data", sqlalchemy.UnicodeText),
# Callback URL:
sqlalchemy.Column('result_url', sqlalchemy.UnicodeText),
sqlalchemy.Column("result_url", sqlalchemy.UnicodeText),
)
return _jobs_table


def _init_metadata_table():
"""Initialise the "metadata" table in the db."""
_metadata_table = sqlalchemy.Table(
'metadata', _METADATA,
"metadata",
_METADATA,
sqlalchemy.Column(
'job_id', sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"),
nullable=False, primary_key=True),
sqlalchemy.Column('key', sqlalchemy.UnicodeText, primary_key=True),
sqlalchemy.Column('value', sqlalchemy.UnicodeText, index=True),
sqlalchemy.Column('type', sqlalchemy.UnicodeText),
"job_id",
sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"),
nullable=False,
primary_key=True,
),
sqlalchemy.Column("key", sqlalchemy.UnicodeText, primary_key=True),
sqlalchemy.Column("value", sqlalchemy.UnicodeText, index=True),
sqlalchemy.Column("type", sqlalchemy.UnicodeText),
)
return _metadata_table


def _init_logs_table():
"""Initialise the "logs" table in the db."""
_logs_table = sqlalchemy.Table(
'logs', _METADATA,
"logs",
_METADATA,
sqlalchemy.Column(
'job_id', sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"),
nullable=False),
sqlalchemy.Column('timestamp', sqlalchemy.DateTime),
sqlalchemy.Column('message', sqlalchemy.UnicodeText),
sqlalchemy.Column('level', sqlalchemy.UnicodeText),
sqlalchemy.Column('module', sqlalchemy.UnicodeText),
sqlalchemy.Column('funcName', sqlalchemy.UnicodeText),
sqlalchemy.Column('lineno', sqlalchemy.Integer)
"job_id",
sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"),
nullable=False,
),
sqlalchemy.Column("timestamp", sqlalchemy.DateTime),
sqlalchemy.Column("message", sqlalchemy.UnicodeText),
sqlalchemy.Column("level", sqlalchemy.UnicodeText),
sqlalchemy.Column("module", sqlalchemy.UnicodeText),
sqlalchemy.Column("funcName", sqlalchemy.UnicodeText),
sqlalchemy.Column("lineno", sqlalchemy.Integer),
)
return _logs_table

Expand All @@ -423,16 +431,17 @@ def _get_metadata(job_id):
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = six.text_type(job_id)
with ENGINE.connect() as conn:
results = conn.execute(
METADATA_TABLE.select().where(METADATA_TABLE.c.job_id == job_id)
).fetchall()

results = ENGINE.execute(
METADATA_TABLE.select().where(
METADATA_TABLE.c.job_id == job_id)).fetchall()
metadata = {}
for row in results:
value = row['value']
if row['type'] == 'json':
value = row["value"]
if row["type"] == "json":
value = json.loads(value)
metadata[row['key']] = value
metadata[row["key"]] = value
return metadata


Expand All @@ -441,13 +450,14 @@ def _get_logs(job_id):
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = six.text_type(job_id)

results = ENGINE.execute(
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall()
with ENGINE.connect() as conn:
results = conn.execute(
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)
).fetchall()

results = [dict(result) for result in results]

for result in results:
result.pop("job_id")

return results
return results
2 changes: 1 addition & 1 deletion ckanext/csvwmapandtransform/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CsvwMapAndTransformPlugin(plugins.SingletonPlugin, DefaultTranslation):
def update_config(self, config_):
toolkit.add_template_directory(config_, "templates")
toolkit.add_public_directory(config_, "public")
toolkit.add_resource("fanstatic", "csvwmapandtransform")
toolkit.add_resource("assets", "aiextract")

# IResourceUrlChange

Expand Down
Loading

0 comments on commit 508c8ca

Please sign in to comment.