Skip to content

Commit

Permalink
fixed status update, added sparql link as resource
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Hanke committed May 8, 2024
1 parent e0816d8 commit 68170d8
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 60 deletions.
12 changes: 8 additions & 4 deletions ckanext/fuseki/assets/fuseki.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,28 @@ ckan.module('fuseki', function (jQuery) {
dataType: p.dataType,
success: function (response) {
var html = jQuery(response);
var length = html.text().length
// console.log('html:', html); // log the HTML to the console for debugging
if (html.length) {
if (html.length !== html_length) {
if (length) {
if (length !== html_length) {
self.el.html(html); // update the HTML if there are changes
console.log("Fuseki: status updated");
html_length = html.length;
html_length = length;
}
} else {
console.log('Error: #ajax-status element not found');
}
},
error: function (xhr, status, error) {
console.log('Error:', error);
},
complete: function () {
// call the update function recursively after a delay
setTimeout(update, 2000);
}
});
};
update(); // call the update function immediately after initialization
var updateInterval = setInterval(update, 5000); // set the interval to 20 seconds (20000 milliseconds)
}
};
});
24 changes: 18 additions & 6 deletions ckanext/fuseki/backend.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
# -*- coding: utf-8 -*-

import logging
import os, logging
from ckan.common import config
import requests
from rdflib import Graph
from ckan.plugins.toolkit import asbool


# from io import BytesIO

log = logging.getLogger(__name__)
CHUNK_SIZE = 16 * 1024 # 16kb
SSL_VERIFY = asbool(os.environ.get("FUSEKI_SSL_VERIFY", True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()


def graph_delete(graph_id: str):
Expand All @@ -21,7 +26,9 @@ def graph_delete(graph_id: str):
graph_id=graph_id
)
jena_dataset_delete_res = requests.delete(
jena_dataset_delete_url, auth=(jena_username, jena_password)
jena_dataset_delete_url,
auth=(jena_username, jena_password),
verify=SSL_VERIFY,
)
jena_dataset_delete_res.raise_for_status()
except Exception as e:
Expand All @@ -41,7 +48,7 @@ def resource_upload(resource, graph_url, api_key=""):
else:
header, key = "Authorization", api_key
headers[header] = key
response = requests.get(resource["url"], headers=headers)
response = requests.get(resource["url"], headers=headers, verify=SSL_VERIFY)
response.raise_for_status()
# file_object = BytesIO(response.content)
file_type = resource["mimetype"]
Expand All @@ -61,7 +68,7 @@ def resource_upload(resource, graph_url, api_key=""):
files = {"file": (resource["name"], file_data, file_type, {"Expires": "0"})}
# files = {"file": (resource["name"], file_object)}
jena_upload_res = requests.post(
graph_url, files=files, auth=(jena_username, jena_password)
graph_url, files=files, auth=(jena_username, jena_password), verify=SSL_VERIFY
)
jena_upload_res.raise_for_status()
return True
Expand All @@ -77,7 +84,9 @@ def resource_exists(id):
resource_id=id
)
jena_dataset_stats_res = requests.get(
jena_dataset_stats_url, auth=(jena_username, jena_password)
jena_dataset_stats_url,
auth=(jena_username, jena_password),
verify=SSL_VERIFY,
)
jena_dataset_stats_res.raise_for_status()
if jena_dataset_stats_res.status_code == requests.codes.ok:
Expand All @@ -97,7 +106,9 @@ def get_graph(graph_id):
graph_id=graph_id
)
jena_dataset_stats_res = requests.get(
jena_dataset_stats_url, auth=(jena_username, jena_password)
jena_dataset_stats_url,
auth=(jena_username, jena_password),
verify=SSL_VERIFY,
)
jena_dataset_stats_res.raise_for_status()
if jena_dataset_stats_res.status_code == requests.codes.ok:
Expand All @@ -120,6 +131,7 @@ def graph_create(graph_id: str):
jena_dataset_create_url,
params={"dbName": graph_id, "dbType": "mem"},
auth=(jena_username, jena_password),
verify=SSL_VERIFY,
)
jena_dataset_create_res.raise_for_status()
return jena_base_url + "{graph_id}".format(graph_id=graph_id)
8 changes: 7 additions & 1 deletion ckanext/fuseki/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ def fuseki_query_url(pkg_dict):
# fuseki query interface
url = "{}#/dataset/{}/query".format(FUSEKI_URL, pkg_dict["id"])
else:
url = "{}?title={}&endpoint={}/{}".format(
url = "{}?title={}&endpoint={}{}".format(
SPARKLIS_URL, pkg_dict["name"], FUSEKI_URL, pkg_dict["id"]
)
return url


def fuseki_sparql_url(pkg_dict):
url = "{}{}".format(FUSEKI_URL, pkg_dict["id"])
return url


def get_helpers():
return {
"fuseki_show_tools": fuseki_show_tools,
"fuseki_graph_exists": fuseki_graph_exists,
"fuseki_query_url": fuseki_query_url,
"fuseki_sparql_url": fuseki_sparql_url,
}
7 changes: 6 additions & 1 deletion ckanext/fuseki/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
class Context(dict):
def __init__(self, **kwargs):
super().__init__(**kwargs)


from ckan.lib.jobs import DEFAULT_QUEUE_NAME

import datetime, os
Expand All @@ -21,7 +23,7 @@ def __init__(self, **kwargs):
from dateutil.parser import isoparse as parse_iso_date

from ckanext.fuseki import db, backend
from ckanext.fuseki.tasks import update
from ckanext.fuseki.tasks import update, SPARQL_RES_NAME, resource_search

import sqlalchemy as sa

Expand Down Expand Up @@ -68,6 +70,9 @@ def fuseki_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]
existing_task = toolkit.get_action("task_status_show")(
{}, {"entity_id": graph_id, "task_type": "fuseki", "key": "fuseki"}
)
# delete SPARQL link
link_id = resource_search(graph_id, SPARQL_RES_NAME)
toolkit.get_action("resource_delete")({}, link_id)
if existing_task:
toolkit.get_action("task_status_delete")(
context, {"id": existing_task["id"]}
Expand Down
134 changes: 133 additions & 1 deletion ckanext/fuseki/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import os
from io import BytesIO
from requests_toolbelt.multipart.encoder import MultipartEncoder
from urllib.parse import urlparse, urljoin, urlsplit

import ckanapi
import ckanapi.datapackage
import requests
from ckan.plugins.toolkit import get_action, asbool
from ckan import model
import datetime
from ckanext.fuseki import db, backend
from ckanext.fuseki import db, backend, helpers

try:
from urllib.parse import urlsplit
Expand All @@ -21,6 +24,8 @@
CKAN_URL = os.environ.get("CKAN_SITE_URL", "http://localhost:5000")
FUSEKI_CKAN_TOKEN = os.environ.get("FUSEKI_CKAN_TOKEN", "")
SSL_VERIFY = asbool(os.environ.get("FUSEKI_SSL_VERIFY", True))
SPARQL_RES_NAME = "SPARQL"

if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()

Expand Down Expand Up @@ -82,6 +87,18 @@ def update(
)
else:
logger.info("Upload {} to graph {} successfull".format(_res["url"], _graph))
# create a link to the sparql endpoint
link_id = resource_search(dataset_id, SPARQL_RES_NAME)
pkg_dict = get_action("package_show")({}, {"id": dataset_id})
sparql_link = upload_link(
dataset_id,
res_id=link_id,
mime_type="application/sparql-results+xml",
format="SPARQL",
authorization=FUSEKI_CKAN_TOKEN,
link_url=helpers.fuseki_sparql_url(pkg_dict),
)
logger.info("SPARQL link added to dataset: {}".format(sparql_link))
logger.info("job completed results at {}".format(_graph))
# all is done update job status
job_dict["status"] = "complete"
Expand Down Expand Up @@ -187,3 +204,118 @@ def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)


# # Upload resource to CKAN as a new/updated resource
# # res=get_resource(res_id)
# metadata_res = resource_search(dataset_id, filename)
# # log.debug(meta_data)
# prefix, suffix = filename.rsplit(".", 1)
# if suffix == "json" and "ld+json" in mime_type:
# filename = prefix + ".jsonld"
# log.debug(
# "{}.{} {} is json-ld:{}".format(
# prefix, suffix, mime_type, "ld+json" in mime_type
# )
# )
# if metadata_res:
# log.debug("Found existing resource {}".format(metadata_res))
# existing_id = metadata_res["id"]
# else:
# existing_id = None

# res = file_upload(
# dataset_id=dataset_id,
# filename=filename,
# filedata=meta_data,
# res_id=existing_id,
# format="json-ld",
# mime_type=mime_type,
# authorization=CSVTOCSVW_TOKEN,
# )


def upload_link(
dataset_id,
link_url,
res_id=None,
group=None,
format="",
mime_type="text/html",
# application/sparql-query
authorization=None,
):
headers = {}
if authorization:
headers["Authorization"] = authorization
if res_id:
url = expand_url(CKAN_URL, "/api/action/resource_patch")
data = {
"id": res_id,
"url": link_url,
"mimetype": mime_type,
"format": format,
}
else:
url = expand_url(CKAN_URL, "/api/action/resource_create")
data = {
"package_id": dataset_id,
"url": link_url,
"name": SPARQL_RES_NAME,
"mimetype": mime_type,
"format": format,
}
response = requests.post(url, headers=headers, json=data, verify=SSL_VERIFY)
response.raise_for_status()
r = response.json()
return r


def file_upload(
dataset_id,
filename,
filedata,
res_id=None,
format="",
group=None,
mime_type="text/csv",
authorization=None,
):
data_stream = BytesIO(filedata)
headers = {}
if authorization:
headers["Authorization"] = authorization
if res_id:
mp_encoder = MultipartEncoder(
fields={"id": res_id, "upload": (filename, data_stream, mime_type)}
)
else:
mp_encoder = MultipartEncoder(
fields={
"package_id": dataset_id,
"name": filename,
"format": format,
"id": res_id,
"upload": (filename, data_stream, mime_type),
}
)
headers["Content-Type"] = mp_encoder.content_type
if res_id:
url = expand_url(CKAN_URL, "/api/action/resource_patch")
else:
url = expand_url(CKAN_URL, "/api/action/resource_create")
response = requests.post(url, headers=headers, data=mp_encoder, verify=SSL_VERIFY)
response.raise_for_status()
r = response.json()
logger.debug("file {} uploaded at: {}".format(filename, r))
return r


def expand_url(base, url):
p_url = urlparse(url)
if not p_url.scheme in ["https", "http"]:
# relative url?
p_url = urljoin(base, p_url.path)
return p_url
else:
return p_url.path.geturl()
46 changes: 46 additions & 0 deletions ckanext/fuseki/templates/fuseki/logs.html
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
{% import 'macros/form.html' as form %}
<form class="add-to-group" method="post">
{{ h.csrf_input() }}
<button class="btn btn-secondary" name="create/update" type="submit">
<i class="fa fa-refresh"></i> {{ _('Create/Update') }}
</button>
{% if h.fuseki_graph_exists(pkg_dict.id) %}
<button class="btn btn-danger" name="delete" type="submit">
<i class="fa fa-trash"></i> {{ _('Delete') }}
</button>
<a class="btn btn-primary mb-0" href="{{ h.fuseki_query_url(pkg_dict)|safe }}" role="button">
<i class="fa fa-play"></i> {{ _('Query') }}
</a>
{% endif %}
<hr class="mt-0">
<table class="table table-bordered table-sm m-0">
<thead class="">
<tr>
<th>{{ _('upload') }}</th>
<th>{{ _('name') }}</th>
<th>{{ _('format') }}</th>
<th>{{ _('mime type') }}</th>
<th>{{ _('size') }} [B]</th>
</tr>
</thead>
<tbody>
{% for resource in pkg_dict.resources %}
<tr>
<td>
<div class="form-check form-switch">
<input class="form-check-input" type="checkbox" name='resid' id="ckeck-{{resource.id}}"
value={{resource.id}} {% if status.metadata is defined %} {% if resource.id in
status.metadata.res_ids%}checked='true' {% endif %} {% elif h.fuseki_show_tools(resource)
%}checked='true' {% endif %}>
</div>
</td>
<td>{{resource.name}}</td>
<td>{{resource.format}}</td>
<td>{{resource.mimetype}}</td>
<td>{{resource.size}}</td>
</tr>
{% endfor %}
</tbody>
</table>
</form>
<hr class="mt-0">
{% if status %}
{% if status.error and status.error.message %}
{% set show_table = false %}
Expand Down
Loading

0 comments on commit 68170d8

Please sign in to comment.