diff --git a/README.md b/README.md new file mode 100644 index 0000000..8031085 --- /dev/null +++ b/README.md @@ -0,0 +1,105 @@ +# SaGe: A Preemptive SPARQL Server for Online Knowledge Graphs + +**Authors:** Julien Aimonier-Davat (LS2N), Hala Skaf-Molli (LS2N), Pascal Molli (LS2N) and Thomas Minier + +**Abstract** +In order to provide stable and responsive SPARQL endpoints to the community, public SPARQL endpoints enforce fair use policies. Unfortunately, long-running SPARQL queries cannot be executed under the fair use policy restrictions, providing only partial results. In this paper, we present SaGe, a SPARQL server based on the web preemption principle. Instead of stopping queries after a quota of time, SaGe suspends the current query and returns it to the user. The user is then free to continue executing the query from the point where it was stopped by simply returning the suspended query to the server. In this paper, we describe the current state of the SaGe server, including the latest advances on the expressiveness of the server and its ability to support updates. + +# Experimental results + +## Dataset and Queries + +In our experiments, we re-use the RDF dataset and the +SPARQL queries from the [BrTPF](https://doi.org/10.1007/978-3-319-48472-3_48) +experimental study. The dataset contains 10M triples and we randomly picked 60 +queries such that all queries complete at least in 30 minutes. + +## Machine configuration + +We run all our experiments on a `MacBook Pro` with a `2,3 GHz Intel Core i7` +processor and a `1TB SSD disk`. + +## Plots + +**Plot 1**: Execution time of the query `?s ?p ?o` using the different backends. + +![](figures/spo_execution_times.png?raw=true) + +**Plot 2**: Supend/Resume time of the different backends and triple pattern shapes. + +![](figures/suspend_resume_times.png?raw=true) + +**Plot 3**: The execution time of the different backends on the `WatDiv` queries. + +![](figures/execution_times.png?raw=true) + +# Experimental study + +## Dependencies + +To run our experiments, the following softwares and packages have to be installed on your system. +* [Python3.7](https://www.python.org) with developpement headers +* [Virtualenv](https://pypi.org/project/virtualenv) +* [sage-engine](https://github.com/sage-org/sage-engine) +* [PostgreSQL](https://www.postgresql.org) +* [HBase](https://hbase.apache.org) + +## Installation + +Once all dependencies have been installed, clone this repository and install the project. + +```bash +# clone the project repository +git clone https://github.com/JulienDavat/sage-backends-experiments.git +cd sage-backends-experiments +# create a virtual environement to isolate project dependencies +virtualenv sage-env +# activate the virtual environement +source sage-env/bin/activate +# install the main dependencies +pip install -r requirements.txt +``` + +## Preparation + +```bash +# download datasets into the graphs directory +mkdir graphs && cd graphs +wget nas.jadserver.fr/thesis/projects/sage/datasets/watdiv10M.hdt +wget nas.jadserver.fr/thesis/projects/sage/datasets/watdiv10M.nt +cd .. +# download queries into the workloads directory +cd workloads +wget nas.jadserver.fr/thesis/projects/sage/queries/watdiv_workloads.gz +cd .. +# insert data into PostgreSQL +sage-postgres-init --no-index configs/sage/backends.yaml sage_psql +sage-postgres-put graphs/watdiv10M.nt configs/sage/backends.yaml sage_psql +sage-postgres-index configs/sage/backends.yaml sage_psql +# insert data into SQLite +sage-sqlite-init --no-index configs/sage/backends.yaml sage_sqlite +sage-sqlite-put graphs/watdiv10M.nt configs/sage/backends.yaml sage_sqlite +sage-sqlite-index configs/sage/backends.yaml sage_sqlite +# insert data into HBase +sage-hbase-init --no-index configs/sage/backends.yaml sage_hbase +sage-hbase-put graphs/watdiv10M.nt configs/sage/backends.yaml sage_hbase +sage-hbase-index configs/sage/backends.yaml sage_hbase +# run the SaGe server +sage configs/sage/backends.yaml -w 1 -p 8080 +``` + +## Running the experiments + +Our experimental study is powered by **Snakemake**. The main commands used in our +experimental study are given below: + +```bash +# Plot backends execution times for the ?s ?p ?o query +snakemake --cores 1 figures/spo_execution_times.png + +# Plot backends suspend/resume times +snakemake --cores 1 figures/suspend_resume_times.png + +# Plot backends execution times for a given WatDiv workload +snakemake --cores 1 figures/[workload directory]/execution_times.png +``` diff --git a/Snakefile b/Snakefile new file mode 100644 index 0000000..74594b1 --- /dev/null +++ b/Snakefile @@ -0,0 +1,2 @@ +include: "rules/exec.smk" +include: "rules/plot.smk" diff --git a/configs/sage/backends.yaml b/configs/sage/backends.yaml new file mode 100644 index 0000000..84f52a9 --- /dev/null +++ b/configs/sage/backends.yaml @@ -0,0 +1,37 @@ +name: SaGe experimental server +quota: 60000 +max_results: 10000 +graphs: +- name: sage_psql + uri: http://localhost:8080/sparql/sage_psql + backend: postgres + dbname: sage + user: sage + password: 'sage' +- + name: sage_psql_catalog + uri: http://localhost:8080/sparql/sage_psql_catalog + backend: postgres-catalog + dbname: sage + user: sage + password: 'sage' +- + name: sage_sqlite + uri: http://localhost:8080/sparql/sage_sqlite + backend: sqlite + database: graphs/sage-sqlite.db +- + name: sage_sqlite_catalog + uri: http://localhost:8080/sparql/sage_sqlite_catalog + backend: sqlite-catalog + database: graphs/sage-sqlite-catalog.db +- + name: sage_hdt + uri: http://localhost:8080/sparql/sage_hdt + backend: hdt-file + file: graphs/watdiv.10M.hdt +- + name: sage_hbase + uri: http://localhost:8080/sparql/sage_hbase + backend: hbase + thrift_host: localhost diff --git a/configs/sage/sage-hdt.yaml b/configs/sage/sage-hdt.yaml new file mode 100644 index 0000000..440ade2 --- /dev/null +++ b/configs/sage/sage-hdt.yaml @@ -0,0 +1,9 @@ +name: SaGe experimental server +quota: 60000 +max_results: 10000 +graphs: +- + name: sage_hdt + uri: http://localhost:8080/sparql/sage_hdt + backend: hdt-file + file: graphs/watdiv.100M.hdt diff --git a/configs/sage/sage-psql.yaml b/configs/sage/sage-psql.yaml new file mode 100644 index 0000000..8187bf4 --- /dev/null +++ b/configs/sage/sage-psql.yaml @@ -0,0 +1,17 @@ +name: SaGe experimental server +quota: 60000 +max_results: 10000 +graphs: +- name: sage_psql + uri: http://localhost:8080/sparql/sage_psql + backend: postgres + dbname: sage + user: sage + password: 'sage' +- + name: sage_psql_catalog + uri: http://localhost:8080/sparql/sage_psql_catalog + backend: postgres-catalog + dbname: sage + user: sage + password: 'sage' diff --git a/configs/sage/sage-sqlite.yaml b/configs/sage/sage-sqlite.yaml new file mode 100644 index 0000000..b234c69 --- /dev/null +++ b/configs/sage/sage-sqlite.yaml @@ -0,0 +1,9 @@ +name: SaGe experimental server +quota: 60000 +max_results: 10000 +graphs: +- + name: sage_sqlite_100M + uri: http://localhost:8080/sparql/sage_sqlite + backend: sqlite + database: graphs/sage-sqlite-100M.db diff --git a/figures/execution_times.png b/figures/execution_times.png new file mode 100644 index 0000000..61138d7 Binary files /dev/null and b/figures/execution_times.png differ diff --git a/figures/spo_execution_times.png b/figures/spo_execution_times.png new file mode 100644 index 0000000..1a76536 Binary files /dev/null and b/figures/spo_execution_times.png differ diff --git a/figures/suspend_resume_times.png b/figures/suspend_resume_times.png new file mode 100644 index 0000000..5c1d542 Binary files /dev/null and b/figures/suspend_resume_times.png differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f75431c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +sparqlwrapper +snakemake +seaborn +matplotlib +coloredlogs +click diff --git a/rules/exec.smk b/rules/exec.smk new file mode 100644 index 0000000..d85f8a8 --- /dev/null +++ b/rules/exec.smk @@ -0,0 +1,26 @@ +rule run_sage: + input: + ancient("workloads/{workload}/{query}.rq") + output: + result="output/{workload,[^/]+}/{backend,sage_[^/]+}/{query,[^/]+}.json", + stats="output/{workload,[^/]+}/{backend,sage_[^/]+}/{query,[^/]+}.csv", + params: + endpoint="http://localhost:8080/sparql", + shell: + "python scripts/query_sage.py {input} \ + http://localhost:8080/sparql http://localhost:8080/sparql/{wildcards.backend} \ + --output {output.result} --measures {output.stats}" + + +rule run_virtuoso: + input: + ancient("workloads/{workload}/{query}.rq") + output: + result="output/{workload,[^/]+}/virtuoso/{query,[^/]+}.json", + stats="output/{workload,[^/]+}/virtuoso/{query,[^/]+}.csv", + params: + endpoint="http://localhost:8890/sparql", + shell: + "python scripts/query_virtuoso.py {input} \ + http://localhost:8890/sparql http://example.org/datasets/watdiv10M \ + --output {output.result} --measures {output.stats}" diff --git a/rules/plot.smk b/rules/plot.smk new file mode 100644 index 0000000..dbe4b37 --- /dev/null +++ b/rules/plot.smk @@ -0,0 +1,70 @@ +from scripts.utils import list_files, query_name + +def list_workload_queries(wildcards): + return [ query_name(q) for q in list_files(f"workloads/{wildcards.workload}", "rq") ] + +def list_hbase_queries(wildcards): + return [ query_name(q) for q in list_files(f"output/{wildcards.workload}/sage_hbase", "csv") ] + +rule prepare_backend_data: + input: + "output/{workload}/{backend}/{query}.csv" + output: + "output/{workload,[^/]+}/{backend,[^/]+}/{query,[^/]+}-prepared.csv" + shell: + "touch {output}; " + "echo 'backend,query,execution_time,nb_calls,nb_results,loading_time,resume_time' > {output}; " + "echo -n '{wildcards.backend},{wildcards.query},' >> {output}; " + "cat {input} >> {output}" + + +rule merge_backend_data: + input: + lambda wildcards: expand("output/{{workload}}/{{backend}}/{query}-prepared.csv", query=list_workload_queries(wildcards)) + output: + "output/{workload,[^/]+}/{backend,[^/]+}/execution_times.csv" + shell: + "bash scripts/merge_csv.sh {input} > {output}" + + +rule merge_backends_data: + input: + sage_psql=ancient("output/{workload}/sage_psql/execution_times.csv"), + sage_psql_catalog=ancient("output/{workload}/sage_psql_catalog/execution_times.csv"), + sage_sqlite=ancient("output/{workload}/sage_sqlite/execution_times.csv"), + sage_sqlite_catalog=ancient("output/{workload}/sage_sqlite_catalog/execution_times.csv"), + sage_hdt=ancient("output/{workload}/sage_hdt/execution_times.csv"), + sage_hbase=ancient("output/{workload}/sage_hbase/execution_times.csv"), + output: + "output/{workload,[^/]+}/execution_times.csv" + shell: + "bash scripts/merge_csv.sh {input.sage_psql} {input.sage_psql_catalog} \ + {input.sage_sqlite} {input.sage_sqlite_catalog} \ + {input.sage_hdt} {input.sage_hbase} > {output}" + + +rule plot_execution_times: + input: + ancient("output/{workload}/execution_times.csv") + output: + "figures/{workload,[^/]+}/execution_times.png" + shell: + "python scripts/plots.py execution-times {input} {output}" + + +rule plot_suspend_resume_times: + input: + ancient("output/indexes/execution_times.csv") + output: + "figures/suspend_resume_times.png" + shell: + "python scripts/plots.py suspend-resume-times {input} {output}" + + +rule spo_execution_times: + input: + ancient("output/spo/execution_times.csv") + output: + "figures/spo_execution_times.png" + shell: + "python scripts/plots.py spo-execution-times {input} {output}" diff --git a/scripts/merge_csv.sh b/scripts/merge_csv.sh new file mode 100644 index 0000000..e19f2c0 --- /dev/null +++ b/scripts/merge_csv.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +awk 'FNR==1 && NR!=1{next;}{print}' $@ | sed '/^\s*$/d' diff --git a/scripts/plots.py b/scripts/plots.py new file mode 100644 index 0000000..b88b827 --- /dev/null +++ b/scripts/plots.py @@ -0,0 +1,118 @@ +import seaborn as sns +import sys +import pandas +import click +import matplotlib.pyplot as plt + +from pandas import read_csv +from matplotlib.patches import Patch + + +def show_values_on_bars(ax, unit): + for p in ax.patches: + ax.annotate(f"%d{unit}" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()), + ha='center', va='center', fontsize=11, color='gray', xytext=(0, 20), + textcoords='offset points') + + +@click.group() +def cli(): + pass + + +@cli.command() +@click.argument('data', type=click.Path(exists=True, file_okay=True, dir_okay=False)) +@click.argument('output', type=click.Path()) +def execution_times(data, output): + dataframe = read_csv(data, sep=',') + + backends = dataframe["backend"].unique() + colors = sns.color_palette("tab10", n_colors=len(backends)) + + chart = sns.boxplot(x="backend", y="execution_time", data=dataframe, palette=colors) + chart.set_xlabel("Backends") + chart.set_ylabel("Execution time (sec)") + chart.set_yscale("log") + chart.set_xticklabels( + [ "" for b in chart.get_xticklabels() ], + rotation=90, + horizontalalignment="center", + fontweight="light", + fontsize="large" + ) + + cmap = dict(zip(backends, colors)) + patches = [Patch(color=v, label=k) for k, v in cmap.items()] + + plt.legend(handles=patches, loc='upper center', bbox_to_anchor=(0.5, 1.16), fancybox=True, ncol=3) + chart.get_figure().savefig(output) + + +@cli.command() +@click.argument('data', type=click.Path(exists=True, file_okay=True, dir_okay=False)) +@click.argument('output', type=click.Path()) +def suspend_resume_times(data, output): + dataframe = read_csv(data, sep=',') + + figure, axes = plt.subplots(1, 2) + + loading_times = sns.barplot(x="query", y="loading_time", hue="backend", data=dataframe, ax=axes[0]) + loading_times.get_legend().remove() + loading_times.set_ylabel("Time to resume the query (ms)") + loading_times.set_xlabel("") + + resume_times = sns.barplot(x="query", y="resume_time", hue="backend", data=dataframe, ax=axes[1]) + resume_times.get_legend().remove() + resume_times.set_ylabel("Time to suspend the query (ms)") + resume_times.set_xlabel("") + + figure.text(0.5, 0.01, 'Indexes used to evaluate the query', ha='center') + + handles, labels = axes[1].get_legend_handles_labels() + figure.legend(handles, labels, loc='upper center', bbox_to_anchor=(0.5, 1), fancybox=True, ncol=3) + + plt.tight_layout() + plt.subplots_adjust(top=0.87) + + plt.show() + figure.savefig(output) + + +@cli.command() +@click.argument('data', type=click.Path(exists=True, file_okay=True, dir_okay=False)) +@click.argument('output', type=click.Path()) +def spo_execution_times(data, output): + dataframe = read_csv(data, sep=',') + + dataframe = dataframe.loc[dataframe["query"] == "SPO"] + + backends = dataframe["backend"].unique() + colors = sns.color_palette("tab10", n_colors=len(backends)) + + chart = sns.barplot(x="backend", y="execution_time", data=dataframe, palette=colors) + chart.set_ylabel("Query execution time (sec)") + chart.set_xlabel("") + + cmap = dict(zip(backends, colors)) + patches = [Patch(color=v, label=k) for k, v in cmap.items()] + + plt.legend(handles=patches, loc='upper center', bbox_to_anchor=(0.5, 1.16), fancybox=True, ncol=3) + chart.set_xticklabels( + [ "" for b in chart.get_xticklabels() ], + rotation=90, + horizontalalignment="center", + fontweight="light", + fontsize="large" + ) + show_values_on_bars(chart, "s") + + plt.ylim(0, 1500) + plt.tight_layout() + plt.subplots_adjust(top=0.87) + + plt.show() + chart.get_figure().savefig(output) + + +if __name__ == "__main__": + cli() diff --git a/scripts/query_sage.py b/scripts/query_sage.py new file mode 100644 index 0000000..50ab304 --- /dev/null +++ b/scripts/query_sage.py @@ -0,0 +1,87 @@ +#!/usr/bin/python3 + +import logging +import coloredlogs +import click +import requests + +from time import time +from json import dumps +from statistics import mean +from utils import list_files, basename + +coloredlogs.install(level='INFO', fmt='%(asctime)s - %(levelname)s %(message)s') +logger = logging.getLogger(__name__) + + +@click.command() +@click.argument('query', type=click.Path(exists=True, dir_okay=False, file_okay=True)) +@click.argument('endpoint', type=str) +@click.argument('default-graph', type=str) +@click.option("--output", type=str, default=None, + help="The file in which the query result will be stored.") +@click.option("--measures", type=str, default=None, + help="The file in which query execution statistics will be stored.") +def execute(query, endpoint, default_graph, output, measures): + headers = { + "accept": "text/html", + "content-type": "application/json", + 'Cache-Control': 'no-cache', + "next": None + } + payload = { + "query": open(query).read(), + "defaultGraph": default_graph, + } + + has_next = True + nb_calls = 0 + results = list() + nb_results = 0 + execution_time = 0 + loading_times = list() + resume_times = list() + + triples_by_obj = dict() + max = 0 + obj = "" + + while has_next: + start_time = time() + response = requests.post(endpoint, headers=headers, data=dumps(payload)) + execution_time += time() - start_time + nb_calls += 1 + + json_response = response.json() + has_next = json_response['next'] + payload["next"] = json_response["next"] + # results.extend(json_response["bindings"]) + nb_results += len(json_response["bindings"]) + loading_times.append(json_response["stats"]["import"]) + resume_times.append(json_response["stats"]["export"]) + + # for bindings in json_response["bindings"]: + # if bindings["?o"] not in triples_by_obj: + # triples_by_obj[bindings["?o"]] = 0 + # else: + # triples_by_obj[bindings["?o"]] += 1 + # if triples_by_obj[bindings["?o"]] > max: + # max = triples_by_obj[bindings["?o"]] + # obj = bindings["?o"] + + if output is not None: + with open(output, 'w') as output_file: + output_file.write(dumps(results)) + logger.info(f'\n{results}') + # logger.info(f'{obj} : {max}') + + if measures is not None: + with open(measures, 'w') as measures_file: + avg_loading_time = mean(loading_times) + avg_resume_time = mean(resume_times) + measures_file.write(f'{execution_time},{nb_calls},{nb_results},{avg_loading_time},{avg_resume_time}') + logger.info(f'Query complete in {execution_time}s with {nb_calls} HTTP calls. {nb_results} solution mappings !') + + +if __name__ == "__main__": + execute() diff --git a/scripts/query_virtuoso.py b/scripts/query_virtuoso.py new file mode 100644 index 0000000..994ef54 --- /dev/null +++ b/scripts/query_virtuoso.py @@ -0,0 +1,49 @@ +#!/usr/bin/python3 + +import logging +import coloredlogs +import click + +from json import dumps +from time import time +from SPARQLWrapper import SPARQLWrapper, JSON +from utils import list_files, basename + +coloredlogs.install(level='INFO', fmt='%(asctime)s - %(levelname)s %(message)s') +logger = logging.getLogger(__name__) + + +@click.command() +@click.argument('query', type=click.Path(exists=True, dir_okay=False, file_okay=True)) +@click.argument('endpoint', type=str) +@click.argument('default-graph', type=str) +@click.option("--output", type=str, default=None, + help="The file in which the source selection will be stored.") +@click.option("--measures", type=str, default=None, + help="The file in which query execution statistics will be stored.") +def execute(query, endpoint, default_graph, output, measures): + sparql = SPARQLWrapper(endpoint) + sparql.setQuery(open(query, 'r').read()) + sparql.setReturnFormat(JSON) + + sparql.addParameter("default-graph-uri", default_graph) + + start_time = time() + results = sparql.query() + end_time = time() - start_time + + formatted_results = results.convert() + + if output is not None: + with open(output, 'w') as output_file: + output_file.write(dumps(formatted_results)) + # logger.info(f'\n{formatted_results}') + + if measures is not None: + with open(measures, 'w') as measures_file: + measures_file.write(f'{end_time},1,{len(formatted_results["results"]["bindings"])}') + logger.info(f'Query complete in {end_time}s with 1 HTTP call. {len(formatted_results["results"]["bindings"])} solution mappings !') + + +if __name__ == "__main__": + execute() diff --git a/scripts/utils.py b/scripts/utils.py new file mode 100644 index 0000000..4d293e8 --- /dev/null +++ b/scripts/utils.py @@ -0,0 +1,8 @@ +from os import listdir +from os.path import basename + +def list_files(directory, extension): + return (f for f in listdir(directory) if f.endswith('.' + extension)) + +def query_name(file): + return basename(file).split('.')[0] diff --git a/workloads/indexes/OSP.rq b/workloads/indexes/OSP.rq new file mode 100644 index 0000000..82185f7 --- /dev/null +++ b/workloads/indexes/OSP.rq @@ -0,0 +1 @@ +select * where { ?s ?p } diff --git a/workloads/indexes/POS.rq b/workloads/indexes/POS.rq new file mode 100644 index 0000000..575ce8a --- /dev/null +++ b/workloads/indexes/POS.rq @@ -0,0 +1 @@ +select * where { ?s a ?o } diff --git a/workloads/indexes/SPO.rq b/workloads/indexes/SPO.rq new file mode 100644 index 0000000..36cf545 --- /dev/null +++ b/workloads/indexes/SPO.rq @@ -0,0 +1 @@ +select * where { ?s ?p ?o } diff --git a/workloads/spo/SPO.rq b/workloads/spo/SPO.rq new file mode 100644 index 0000000..36cf545 --- /dev/null +++ b/workloads/spo/SPO.rq @@ -0,0 +1 @@ +select * where { ?s ?p ?o }