From e6c6f8446a06173436e7292131e9c83074160bbe Mon Sep 17 00:00:00 2001 From: Jim Jones Date: Sat, 2 Mar 2024 00:28:51 +0100 Subject: [PATCH] Add prototype for rdf_fdw_table_clone function This adds a prototype for the function rdf_fdw_table_clone, which enables users to harvest all data (or a subset) from rdf_fdw foreign tables. This function offers the possibility to extract data in small chuncks to avoid large result sets from the target triplestore. This patch also slightly changes the regression tests to a change in the level of a FOREIGN TABLE logging option NOTICE > INFO. --- expected/blazegraph-wikidata.out | 22 +- expected/graphdb-getty.out | 8 +- expected/virtuoso-dbpedia.out | 84 ++-- rdf_fdw--1.0.sql | 16 +- rdf_fdw.c | 701 ++++++++++++++++++++++--------- 5 files changed, 585 insertions(+), 246 deletions(-) diff --git a/expected/blazegraph-wikidata.out b/expected/blazegraph-wikidata.out index 167635a..e06e68c 100644 --- a/expected/blazegraph-wikidata.out +++ b/expected/blazegraph-wikidata.out @@ -37,7 +37,7 @@ SERVER wikidata OPTIONS ( SELECT atmid, bankname, atmwkt FROM atms_munich WHERE bankname = 'BBBank'; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': PREFIX lgdo: @@ -98,7 +98,7 @@ SELECT wikidata_id, label, wkt FROM places_below_sea_level WHERE wikidata_id = 'http://www.wikidata.org/entity/Q61308849' FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT (STR(?place) AS ?placeid) (UCASE(?label) AS ?labelc) ?location @@ -177,7 +177,7 @@ WHERE base_url = 'http://www.wikidata.org/entity/' AND ctlang = 'http://www.wikidata.org/entity/Q32LUXEMBURG' ORDER by language; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?label (STRLEN(?nativename) AS ?1len2) (LANG(?nativename) AS ?language) (STRBEFORE(STR(?country),"Q") AS ?b4se) (STRAFTER(STR(?country),"entity/") AS ?q1d) (CONCAT(STR(?country),UCASE(?nativename)) AS ?ct) ("2002-03-08"^^xsd:date AS ?det) ("2002-03-08T14:33:42"^^xsd:dateTime AS ?ts) (STRSTARTS(STR(?country),"http") AS ?but) (STRENDS(STR(?country),"http") AS ?buf) @@ -222,7 +222,7 @@ WHERE bf IS NOT true AND bt IS true AND bt IS NOT false; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename (STRSTARTS(STR(?country),"http") AS ?but) (STRENDS(STR(?country),"http") AS ?buf) @@ -250,7 +250,7 @@ WHERE NOT bf IS true AND bt IS true AND NOT bt IS false; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename (STRSTARTS(STR(?country),"http") AS ?but) (STRENDS(STR(?country),"http") AS ?buf) @@ -281,7 +281,7 @@ WHERE bf != true AND bt = true AND bt != false; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename (STRSTARTS(STR(?country),"http") AS ?but) (STRENDS(STR(?country),"http") AS ?buf) @@ -305,7 +305,7 @@ BEGIN ORDER BY nativename OFFSET 0 LIMIT 5; END; $$; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename @@ -353,7 +353,7 @@ BEGIN END LOOP; END; $$; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename @@ -365,7 +365,7 @@ SELECT ?country ?nativename ORDER BY ASC (?country) LIMIT 5 -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename @@ -377,7 +377,7 @@ SELECT ?country ?nativename ORDER BY ASC (?country) LIMIT 10 -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename @@ -398,7 +398,7 @@ SELECT uri, nativename LIMIT 15 ) SELECT * FROM local EXCEPT SELECT * FROM j; -NOTICE: SPARQL query sent to 'https://query.wikidata.org/sparql': +INFO: SPARQL query sent to 'https://query.wikidata.org/sparql': SELECT ?country ?nativename diff --git a/expected/graphdb-getty.out b/expected/graphdb-getty.out index f3fcb1b..a15e26d 100644 --- a/expected/graphdb-getty.out +++ b/expected/graphdb-getty.out @@ -36,7 +36,7 @@ SELECT DISTINCT name, lon, lat FROM getty_places ORDER BY lat LIMIT 10; -NOTICE: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': +INFO: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': PREFIX ontogeo: PREFIX geo: @@ -74,7 +74,7 @@ SELECT uri, lon, lat FROM getty_places WHERE name = 'West Flanders, Flanders, Belgium, Europe, World' ORDER BY lon; -NOTICE: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': +INFO: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': PREFIX ontogeo: PREFIX geo: @@ -178,7 +178,7 @@ SELECT * FROM ( FROM getty_places WHERE lat BETWEEN 52.5 AND 53.0) j ORDER BY lat; -NOTICE: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': +INFO: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': PREFIX ontogeo: PREFIX geo: @@ -258,7 +258,7 @@ FROM getty_non_italians WHERE bio ~~* '%artist%' ORDER BY birth LIMIT 10; -NOTICE: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': +INFO: SPARQL query sent to 'http://vocab.getty.edu/sparql.xml': PREFIX ontogeo: diff --git a/expected/virtuoso-dbpedia.out b/expected/virtuoso-dbpedia.out index 8ea3b83..226e73e 100644 --- a/expected/virtuoso-dbpedia.out +++ b/expected/virtuoso-dbpedia.out @@ -40,7 +40,7 @@ SELECT film_id AS id, name, runtime, released AS rel FROM film ORDER BY rel DESC, 1 ASC LIMIT 5; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -75,7 +75,7 @@ SELECT count(runtime),avg(runtime) FROM film ORDER BY count(runtime),avg(runtime) FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -105,7 +105,7 @@ FROM film ORDER BY released DESC, name ASC OFFSET 5 ROWS FETCH FIRST 10 ROW ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -146,7 +146,7 @@ FROM film ORDER BY released DESC, name ASC OFFSET 5 LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -187,7 +187,7 @@ FROM film ORDER BY released DESC, name ASC OFFSET 5 LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -227,7 +227,7 @@ FROM film ORDER BY released DESC, name ASC OFFSET 5 LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -272,7 +272,7 @@ WHERE released < '1930-03-25' ORDER BY name FETCH FIRST 3 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -313,7 +313,7 @@ WHERE released < '1930-03-25' ORDER BY released FETCH FIRST 3 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -352,7 +352,7 @@ WHERE released < '1930-03-25') ORDER BY released ASC, name DESC FETCH FIRST 3 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -386,7 +386,7 @@ FROM film WHERE name = 'The Life of Adam Lindsay Gordon' AND released <> '1930-03-25'; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -454,7 +454,7 @@ FROM politicians WHERE country = 'Germany' ORDER BY birthdate DESC, party ASC LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -501,7 +501,7 @@ FROM politicians WHERE country = 'Germany' ORDER BY birthdate LIMIT 3; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -539,7 +539,7 @@ FROM politicians WHERE country = 'Germany' AND birthdate > '1995-12-31' ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -584,7 +584,7 @@ WHERE lower(country) = 'germany' AND birthdate > '1995-12-31' ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -623,7 +623,7 @@ FROM politicians WHERE country IN ('Germany','France','Portugal') ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -663,7 +663,7 @@ FROM politicians WHERE country NOT IN ('Germany','France','Portugal') ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -703,7 +703,7 @@ FROM politicians WHERE country = ANY(ARRAY['Germany','France','Portugal']) ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -742,7 +742,7 @@ FROM politicians WHERE country <> ANY(ARRAY['Germany','France','Portugal']) ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -779,7 +779,7 @@ FROM politicians WHERE country ~~* ANY(ARRAY['%UsTr%','%TugA%']) ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -816,7 +816,7 @@ FROM politicians WHERE country ~~ ANY(ARRAY['__land%','%GERMAN%']) ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -853,7 +853,7 @@ FROM politicians WHERE NOT country ~~* ANY(ARRAY['%UnItEd%','%land%']) ORDER BY birthdate DESC, party ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -917,7 +917,7 @@ FROM party_members WHERE country ~~* '%isle of man%' ORDER BY nmembers ASC FETCH FIRST 5 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: @@ -978,7 +978,7 @@ SELECT name, party, birthdate FROM chanceler_candidates WHERE party <> '' ORDER BY birthdate DESC; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: @@ -1036,7 +1036,7 @@ SELECT name, wkt FROM german_public_universities ORDER BY lat DESC LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX geo: PREFIX dbp: @@ -1078,7 +1078,7 @@ WHERE lat > 52 AND lon < 9 AND wkt = 'POINT(8.49305534362793 52.03777694702148)'; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX geo: PREFIX dbp: @@ -1112,7 +1112,7 @@ WHERE id <> '' AND lat - 1 > 52 AND -- conditions with expressions in the left side won't be pushed down. lon < 8+1; -- the expression in the right side will be computed before pushdown. -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX geo: PREFIX dbp: @@ -1157,7 +1157,7 @@ SELECT ?uri ?name ?lon ?lat ', log_sparql 'true'); SELECT birthdate FROM person1 WHERE person = 'foo'; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: @@ -1195,7 +1195,7 @@ NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': ', log_sparql 'true'); SELECT birthdate FROM person2 WHERE person = 'foo'; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: @@ -1233,7 +1233,7 @@ NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': SELECT birthdate FROM person3 WHERE person <> '' LIMIT 5; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -1297,7 +1297,7 @@ WHERE released < '1930-03-25' ORDER BY name FETCH FIRST 3 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -1329,7 +1329,7 @@ FROM politicians_germany WHERE birthdate > '1990-12-01' ORDER BY birthdate DESC, party ASC LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -1363,7 +1363,7 @@ WHERE released < '1930-03-25' ORDER BY name FETCH FIRST 3 ROWS ONLY; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: @@ -1394,7 +1394,7 @@ FROM politicians_germany WHERE birthdate > '1990-12-01' ORDER BY birthdate DESC, party ASC LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: @@ -1451,7 +1451,7 @@ SERVER dbpedia OPTIONS ( SELECT name FROM dbpedia_limit LIMIT 2; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -1496,7 +1496,7 @@ SELECT name FROM dbpedia_orderby ORDER BY name DESC LIMIT 2; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -1541,7 +1541,7 @@ SERVER dbpedia OPTIONS ( SELECT DISTINCT name FROM dbpedia_distinct LIMIT 1; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -1584,7 +1584,7 @@ SERVER dbpedia OPTIONS ( SELECT name FROM dbpedia_groupby LIMIT 1; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX dbp: @@ -1623,7 +1623,7 @@ SERVER dbpedia OPTIONS ( SELECT name FROM musical_artists LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbp: PREFIX dbo: @@ -1675,7 +1675,7 @@ SELECT name FROM generic_rdf_table WHERE uri = 'http://dbpedia.org/resource/Isle_of_Man' LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX rdfs: @@ -1720,7 +1720,7 @@ SELECT name FROM generic_rdf_table2 WHERE uri = 'http://dbpedia.org/resource/Brazil' LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX rdfs: @@ -1764,7 +1764,7 @@ SELECT name FROM generic_rdf_table3 WHERE uri = 'http://dbpedia.org/resource/Japan' LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX rdfs: @@ -1805,7 +1805,7 @@ SELECT name FROM generic_rdf_table4 WHERE uri = 'http://dbpedia.org/resource/Japan' LIMIT 10; -NOTICE: SPARQL query sent to 'https://dbpedia.org/sparql': +INFO: SPARQL query sent to 'https://dbpedia.org/sparql': PREFIX dbr: PREFIX rdfs: diff --git a/rdf_fdw--1.0.sql b/rdf_fdw--1.0.sql index 185202e..92e82de 100644 --- a/rdf_fdw--1.0.sql +++ b/rdf_fdw--1.0.sql @@ -11,7 +11,21 @@ RETURNS void AS 'MODULE_PATHNAME' LANGUAGE C STRICT; COMMENT ON FUNCTION rdf_fdw_validator(text[], oid) IS 'RDF Triplestore Foreign-data Wrapper options validator'; - + +CREATE FUNCTION rdf_fdw_clone_table( + foreign_table oid, + target_table text, + begin_offset int DEFAULT 0, + page_size int DEFAULT 0, + max_records int DEFAULT 0, + ordering_column text DEFAULT '', + create_table boolean DEFAULT false, + verbose boolean DEFAULT false) +RETURNS void AS 'MODULE_PATHNAME', 'rdf_fdw_clone_table' +LANGUAGE C IMMUTABLE STRICT PARALLEL UNSAFE; + +COMMENT ON FUNCTION rdf_fdw_clone_table(oid,text,int,int,int,text,boolean,boolean) IS 'materialize rdf_fdw foreign tables into heap tables'; + CREATE FOREIGN DATA WRAPPER rdf_fdw HANDLER rdf_fdw_handler VALIDATOR rdf_fdw_validator; diff --git a/rdf_fdw.c b/rdf_fdw.c index df167cf..696aa88 100644 --- a/rdf_fdw.c +++ b/rdf_fdw.c @@ -67,7 +67,7 @@ #include "access/heapam.h" #endif #include "utils/date.h" - +#include "executor/spi.h" #define REL_ALIAS_PREFIX "r" /* Handy macro to add relation name qualification */ @@ -184,6 +184,9 @@ typedef struct RDFfdwState struct RDFfdwTable *rdfTable;/* All necessary information of the FOREIGN TABLE used in a SQL statement */ Cost startup_cost; /* cost estimate, only needed for planning */ Cost total_cost; /* cost estimate, only needed for planning */ + ForeignServer *server; + ForeignTable *ft; + char* target_table; } RDFfdwState; typedef struct RDFfdwTable @@ -261,10 +264,12 @@ static struct RDFfdwOption valid_options[] = extern Datum rdf_fdw_handler(PG_FUNCTION_ARGS); extern Datum rdf_fdw_validator(PG_FUNCTION_ARGS); extern Datum rdf_fdw_version(PG_FUNCTION_ARGS); +extern Datum rdf_fdw_clone_table(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(rdf_fdw_handler); PG_FUNCTION_INFO_V1(rdf_fdw_validator); PG_FUNCTION_INFO_V1(rdf_fdw_version); +PG_FUNCTION_INFO_V1(rdf_fdw_clone_table); static void rdfGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static void rdfGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); @@ -277,6 +282,8 @@ static void rdfEndForeignScan(ForeignScanState *node); //static TupleTableSlot *rdfExecForeignInsert(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); //static TupleTableSlot *rdfExecForeignDelete(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static void LoadRDFTableInfo(RDFfdwState *state); +static void LoadRDFServerInfo(RDFfdwState *state); static int ExecuteSPARQL(RDFfdwState *state); static void CreateTuple(TupleTableSlot *slot, RDFfdwState *state); static void LoadRDFData(RDFfdwState *state); @@ -290,6 +297,7 @@ static void InitSession(struct RDFfdwState *state, RelOptInfo *baserel, Planner static struct RDFfdwColumn *GetRDFColumn(struct RDFfdwState *state, char *columnname); static int LocateKeyword(char *str, char *start_chars, char *keyword, char *end_chars, int *count, int start_position); static void CreateSPARQL(RDFfdwState *state, PlannerInfo *root); +static char *CreateSQLBulkINSERT(RDFfdwState *state); static void SetUsedColumns(Expr *expr, struct RDFfdwState *state, int foreignrelid); static bool IsSPARQLParsable(struct RDFfdwState *state); static bool IsExpressionPushable(char *expression); @@ -299,6 +307,7 @@ static char *DeparseDate(Datum datum); static char *DeparseTimestamp(Datum datum, bool hasTimezone); static char *DeparseSQLLimit(struct RDFfdwState *state, PlannerInfo *root, RelOptInfo *baserel); static char *DeparseSQLWhereConditions(struct RDFfdwState *state, RelOptInfo *baserel); +static char *DeparseSPARQLWhereGraphPattern(struct RDFfdwState *state); static char *DatumToString(Datum datum, Oid type); static char *DeparseExpr(struct RDFfdwState *state, RelOptInfo *foreignrel, Expr *expr); static char *DeparseSQLOrderBy( struct RDFfdwState *state, PlannerInfo *root, RelOptInfo *baserel); @@ -335,6 +344,291 @@ Datum rdf_fdw_version(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(buffer.data)); } +Datum rdf_fdw_clone_table(PG_FUNCTION_ARGS) +{ + struct RDFfdwState *state = (struct RDFfdwState *)palloc0(sizeof(RDFfdwState)); + Oid foreign_table = PG_GETARG_OID(0); + text *target_table = PG_GETARG_TEXT_P(1); + int begin_offset = PG_GETARG_INT32(2); + int page_size = PG_GETARG_INT32(3); + int max_records = PG_GETARG_INT32(4); + text *ordering_pgcolumn = PG_GETARG_TEXT_P(5); + bool create_table = PG_GETARG_BOOL(6); + bool verbose = PG_GETARG_BOOL(7); + int processed_records = 0; + int num_pages_retrieved = 0; + char *orderby_variable = NULL; + + StringInfoData select; + + elog(DEBUG1,"%s called",__func__); + + if(page_size <= 0) + ereport(ERROR, + (errcode(ERRCODE_FDW_ERROR), + errmsg("invalid page_size: %d",page_size), + errhint("the page size corresponds to the number of records that are retrieved after each iteration and therefore must be a positive number"))); + + if(max_records < 0) + ereport(ERROR, + (errcode(ERRCODE_FDW_ERROR), + errmsg("invalid max_records: %d",max_records), + errhint("max_records corresponds to the total number of records that are retrieved from the FOREIGN TABLE and therefore must be a positive number"))); + + state->foreigntableid = foreign_table; + state->ft = GetForeignTable(state->foreigntableid); + state->server = GetForeignServer(state->ft->serverid); + state->target_table = text_to_cstring(target_table); + state->enable_pushdown = false; + state->query_param = RDF_DEFAULT_QUERY_PARAM; + state->format = RDF_DEFAULT_FORMAT; + state->connect_timeout = RDF_DEFAULT_CONNECTTIMEOUT; + state->max_retries = RDF_DEFAULT_MAXRETRY; + + /* + * Load configured SERVER parameters + */ + LoadRDFServerInfo(state); + + /* + * Load configured FOREIGN TABLE parameters + */ + LoadRDFTableInfo(state); + + state->sparql_prefixes = DeparseSPARQLPrefix(state->raw_sparql); + + initStringInfo(&select); + for (int i = 0; i < state->numcols; i++) + { + /* + * Setting ORDER BY column for the SPARQL query. In case no column + * is provided, we pick up the first 'iri' column in the table. + */ + if(strlen(text_to_cstring(ordering_pgcolumn)) == 0 && !orderby_variable) + { + if (state->rdfTable->cols[i]->nodetype && + strcmp(state->rdfTable->cols[i]->nodetype, RDF_COLUMN_OPTION_NODETYPE_IRI) == 0) + orderby_variable = pstrdup(state->rdfTable->cols[i]->sparqlvar); + } + else if (strcmp(state->rdfTable->cols[i]->name, text_to_cstring(ordering_pgcolumn)) == 0) + { + orderby_variable = pstrdup(state->rdfTable->cols[i]->sparqlvar); + } + + if (!state->rdfTable->cols[i]->expression) + appendStringInfo(&select, "%s ", pstrdup(state->rdfTable->cols[i]->sparqlvar)); + else + appendStringInfo(&select, "(%s AS %s) ", + pstrdup(state->rdfTable->cols[i]->expression), + pstrdup(state->rdfTable->cols[i]->sparqlvar) + ); + } + + if(!orderby_variable && strlen(text_to_cstring(ordering_pgcolumn)) !=0) + ereport(ERROR, + (errcode(ERRCODE_FDW_ERROR), + errmsg("invalid ordering_column '%s'",text_to_cstring(ordering_pgcolumn)))); + + /* + * If no 'ordering_column' was provided and the table has no 'iri' column, + * we use the first colum (index 1) for the SPARQL ORDER BY. + */ + if(!orderby_variable) + orderby_variable = pstrdup(state->rdfTable->cols[1]->sparqlvar); + + state->sparql_prefixes = DeparseSPARQLPrefix(state->raw_sparql); + state->sparql_from = DeparseSPARQLFrom(state->raw_sparql); + state->sparql_select = NameStr(select); + state->sparql_where = DeparseSPARQLWhereGraphPattern(state); + + /* + * Here we try to create the target table with the name give in 'target_table'. + * This new table will be a clone of the queried FOREIGN TABLE, of couse without + * the table and column OPTIONS. + */ + if(create_table) + { + StringInfoData ct; + SPI_connect(); + + initStringInfo(&ct); + appendStringInfo(&ct,"CREATE TABLE %s AS SELECT * FROM %s WITH NO DATA;", + state->target_table, + get_rel_name(state->foreigntableid)); + + if(SPI_exec(NameStr(ct), 0) == SPI_OK_UTILITY && verbose) + elog(INFO,"Target TABLE \"%s\" created based on FOREIGN TABLE \"%s\":\n\n %s\n", + state->target_table, get_rel_name(state->foreigntableid), NameStr(ct)); + + SPI_finish(); + } + + SPI_connect(); + + while(true) + { + char *sqlinsert; + int offset = begin_offset + (num_pages_retrieved * page_size); + int limit = page_size; + StringInfoData limit_clause; + + /* stop iteration if the current offset is greater than max_records */ + if(max_records != 0 && offset >= max_records) + { + elog(DEBUG1,"%s: number of retrieved records reached the limit of %d.\n\n records inserted: %d\n pages: %d\n page size: %d\n", + __func__, + max_records, + processed_records, + num_pages_retrieved, + page_size); + break; + } + + /* + * if the current offset + page_size exceed the set limit we change + * the limit. + */ + if(max_records != 0 && offset + page_size > max_records) + limit = max_records - offset; + + /* + * pagesize and rowcount must be reset before every SPARQL query, + * as it contains the total number of records retrieved from the + * triplestore and the number of records processed for each request. + */ + state->pagesize = 0; + state->rowcount = 0; + + /* + * changes the pagination of the query to match the parameters given + * in the function call. If the SPARQL query set in the FOREIGN TABLE + * already contains a OFFSET LIMIT, it will be overwritten by this string + */ + initStringInfo(&limit_clause); + appendStringInfo(&limit_clause,"ORDER BY %s \nOFFSET %d LIMIT %d", + orderby_variable, + num_pages_retrieved == 0 ? 0 : offset, + limit); + + state->sparql_limit = NameStr(limit_clause); + + /* + * create new SPARQL query with the pagination parameters + */ + CreateSPARQL(state, NULL); + + /* + * execute the newly created SPARQL and load it in 'state'. It updates + * state->pagesize! + */ + LoadRDFData(state); + + /* get out in case the SPARQL retrieves nothing */ + if(state->pagesize == 0) + { + elog(DEBUG1,"%s: SPARQL returned nothing.",__func__); + break; + } + + /* + * create SQL INSERT statements based on the records retrieved from + * the SPARQL query. + */ + sqlinsert = CreateSQLBulkINSERT(state); + + if(SPI_exec(sqlinsert, 0) == SPI_OK_INSERT && verbose) + elog(INFO,"page (%d - %d) stored: %ld records successfully inserted ", + offset, offset + page_size, SPI_processed); + else + ereport(ERROR, + (errcode(ERRCODE_FDW_ERROR), + errmsg("unable to insert data in the target table '%s'",state->target_table))); + + num_pages_retrieved++; + processed_records = processed_records + SPI_processed; + + pfree(limit_clause.data); + } + + if(verbose) + elog(INFO,"Total inserted records: %d", processed_records); + + SPI_finish(); + + PG_RETURN_VOID(); +} + +static char *CreateSQLBulkINSERT(RDFfdwState *state) +{ + xmlNodePtr result; + xmlNodePtr value; + xmlNodePtr record; + StringInfoData name; + StringInfoData values; + StringInfoData insert_columns; + StringInfoData insert_statement; + + initStringInfo(&name); + initStringInfo(&values); + initStringInfo(&insert_columns); + initStringInfo(&insert_statement); + + for (int i = 0; i < state->numcols; i++) + { + appendStringInfo(&insert_columns, "%s%s%s ", + i == 0 ? "(" : "", + state->rdfTable->cols[i]->name, + i == (state->numcols - 1) ? ")" : ","); + } + + for (size_t rec = 0; rec < state->pagesize; rec++) + { + record = FetchNextBinding(state); + + for (int i = 0; i < state->numcols; i++) + { + char *sparqlvar = state->rdfTable->cols[i]->sparqlvar; + + for (result = record->children; result != NULL; result = result->next) + { + appendStringInfo(&name, "?%s", + (char *)xmlGetProp(result, (xmlChar *)RDF_XML_NAME_TAG)); + + if (strcmp(sparqlvar, NameStr(name)) == 0) + { + + for (value = result->children; value != NULL; value = value->next) + { + xmlBufferPtr buffer = xmlBufferCreate(); + xmlNodeDump(buffer, state->xmldoc, value->children, 0, 0); + appendStringInfo(&values, "%s$$%s$$%s ", + i == 0 ? "(" : "", + buffer->content, + i == (state->numcols - 1) ? ")" : ","); + + xmlBufferFree(buffer); + } + } + + resetStringInfo(&name); + } + } + + if(rec+1 < state->pagesize) + appendStringInfo(&values,","); + + state->rowcount++; + } + + appendStringInfo(&insert_statement,"INSERT INTO %s %s VALUES %s;", + state->target_table, + NameStr(insert_columns), + NameStr(values)); + + + return NameStr(insert_statement); +} + Datum rdf_fdw_validator(PG_FUNCTION_ARGS) { List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); @@ -674,6 +968,189 @@ static void rdfEndForeignScan(ForeignScanState *node) { } +static void LoadRDFTableInfo(RDFfdwState *state) +{ + ListCell *cell; + +#if PG_VERSION_NUM < 130000 + Relation rel = heap_open(ft->relid, NoLock); +#else + Relation rel = table_open(state->foreigntableid, NoLock); +#endif + + elog(DEBUG1, "%s called", __func__); + + state->numcols = rel->rd_att->natts; + + /* + *Loading FOREIGN TABLE strucuture (columns and their OPTION values) + */ + state->rdfTable = (struct RDFfdwTable *) palloc0(sizeof(struct RDFfdwTable)); + state->rdfTable->cols = (struct RDFfdwColumn **) palloc0(sizeof(struct RDFfdwColumn*) * state->numcols); + + for (int i = 0; i < state->numcols; i++) + { + List *options = GetForeignColumnOptions(state->foreigntableid, i + 1); + ListCell *lc; + + state->rdfTable->cols[i] = (struct RDFfdwColumn *)palloc0(sizeof(struct RDFfdwColumn)); + + /* + * Setting foreign table colmuns's default values. + */ + state->rdfTable->cols[i]->pushable = true; + state->rdfTable->cols[i]->nodetype = RDF_COLUMN_OPTION_NODETYPE_LITERAL; + state->rdfTable->cols[i]->used = false; + + foreach (lc, options) + { + DefElem *def = (DefElem *)lfirst(lc); + + if (strcmp(def->defname, RDF_COLUMN_OPTION_VARIABLE) == 0) + { + elog(DEBUG1," %s: (%d) adding sparql variable > '%s'",__func__,i,defGetString(def)); + state->rdfTable->cols[i]->sparqlvar = pstrdup(defGetString(def)); + } + else if (strcmp(def->defname, RDF_COLUMN_OPTION_EXPRESSION) == 0) + { + elog(DEBUG1," %s: (%d) adding sparql expression > '%s'",__func__,i,defGetString(def)); + state->rdfTable->cols[i]->expression = pstrdup(defGetString(def)); + state->rdfTable->cols[i]->pushable = IsExpressionPushable(defGetString(def)); + elog(DEBUG1," %s: (%d) is expression pushable? > '%s'",__func__,i, + state->rdfTable->cols[i]->pushable ? "true" : "false"); + } + else if (strcmp(def->defname, RDF_COLUMN_OPTION_LITERALTYPE) == 0) + { + StringInfoData literaltype; + initStringInfo(&literaltype); + appendStringInfo(&literaltype, "^^%s", defGetString(def)); + elog(DEBUG1," %s: (%d) adding sparql literal data type > '%s'",__func__,i,defGetString(def)); + state->rdfTable->cols[i]->literaltype = pstrdup(literaltype.data); + } + else if (strcmp(def->defname, RDF_COLUMN_OPTION_NODETYPE) == 0) + { + elog(DEBUG1," %s: (%d) adding sparql node data type > '%s'",__func__,i,defGetString(def)); + state->rdfTable->cols[i]->nodetype = pstrdup(defGetString(def)); + } + else if (strcmp(def->defname, RDF_COLUMN_OPTION_LANGUAGE) == 0) + { + StringInfoData tag; + initStringInfo(&tag); + appendStringInfo(&tag, "@%s", defGetString(def)); + elog(DEBUG1," %s: (%d) adding literal language tag > '%s'",__func__,i,defGetString(def)); + state->rdfTable->cols[i]->language = pstrdup(tag.data); + } + } + + elog(DEBUG1," %s: (%d) adding data type > %u",__func__,i,rel->rd_att->attrs[i].atttypid); + + state->rdfTable->cols[i]->pgtype = rel->rd_att->attrs[i].atttypid; + state->rdfTable->cols[i]->name = pstrdup(NameStr(rel->rd_att->attrs[i].attname)); + state->rdfTable->cols[i]->pgtypmod = rel->rd_att->attrs[i].atttypmod; + state->rdfTable->cols[i]->pgattnum = rel->rd_att->attrs[i].attnum; + + if (!canHandleType(state->rdfTable->cols[i]->pgtype)) + { + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), + errmsg("data type of '%s' not supported: %d\n", state->rdfTable->cols[i]->name, state->rdfTable->cols[i]->pgtype))); + } + } + +#if PG_VERSION_NUM < 130000 + heap_close(rel, NoLock); +#else + table_close(rel, NoLock); +#endif + + + /* + * Loading FOREIGN TABLE OPTIONS + */ + foreach (cell, state->ft->options) + { + DefElem *def = lfirst_node(DefElem, cell); + + if (strcmp(RDF_TABLE_OPTION_SPARQL, def->defname) == 0) + { + state->raw_sparql = defGetString(def); + state->is_sparql_parsable = IsSPARQLParsable(state); + } + else if (strcmp(RDF_TABLE_OPTION_LOG_SPARQL, def->defname) == 0) + state->log_sparql = defGetBoolean(def); + + else if (strcmp(RDF_TABLE_OPTION_ENABLE_PUSHDOWN, def->defname) == 0) + state->enable_pushdown = defGetBoolean(def); + } + +} + +static void LoadRDFServerInfo(RDFfdwState *state) +{ + elog(DEBUG1, "%s called", __func__); + + if (state->server) + { + ListCell *cell; + + foreach (cell, state->server->options) + { + DefElem *def = lfirst_node(DefElem, cell); + + if (strcmp(RDF_SERVER_OPTION_ENDPOINT, def->defname) == 0) + state->endpoint = defGetString(def); + + else if (strcmp(RDF_SERVER_OPTION_FORMAT, def->defname) == 0) + state->format = defGetString(def); + + else if (strcmp(RDF_SERVER_OPTION_CUSTOMPARAM, def->defname) == 0) + state->custom_params = defGetString(def); + + else if (strcmp(RDF_SERVER_OPTION_HTTP_PROXY, def->defname) == 0) + { + state->proxy = defGetString(def); + state->proxy_type = RDF_SERVER_OPTION_HTTP_PROXY; + } + else if (strcmp(RDF_SERVER_OPTION_HTTPS_PROXY, def->defname) == 0) + { + state->proxy = defGetString(def); + state->proxy_type = RDF_SERVER_OPTION_HTTPS_PROXY; + } + else if (strcmp(RDF_SERVER_OPTION_PROXY_USER, def->defname) == 0) + state->proxy_user = defGetString(def); + + else if (strcmp(RDF_SERVER_OPTION_PROXY_USER_PASSWORD, def->defname) == 0) + state->proxy_user_password = defGetString(def); + + else if (strcmp(RDF_SERVER_OPTION_CONNECTRETRY, def->defname) == 0) + { + char *tailpt; + char *maxretry_str = defGetString(def); + state->max_retries = strtol(maxretry_str, &tailpt, 0); + } + else if (strcmp(RDF_SERVER_OPTION_REQUEST_REDIRECT, def->defname) == 0) + state->request_redirect = defGetBoolean(def); + + else if (strcmp(RDF_SERVER_OPTION_REQUEST_MAX_REDIRECT, def->defname) == 0) + { + char *tailpt; + char *maxredirect_str = defGetString(def); + state->request_max_redirect = strtol(maxredirect_str, &tailpt, 0); + } + else if (strcmp(RDF_SERVER_OPTION_CONNECTTIMEOUT, def->defname) == 0) + { + char *tailpt; + char *timeout_str = defGetString(def); + state->connect_timeout = strtol(timeout_str, &tailpt, 0); + } + else if (strcmp(RDF_SERVER_OPTION_ENABLE_PUSHDOWN, def->defname) == 0) + state->enable_pushdown = defGetBoolean(def); + + else if (strcmp(RDF_SERVER_OPTION_QUERY_PARAM, def->defname) == 0) + state->query_param = defGetString(def); + } + } +} /* * CStringToConst * ----------------- @@ -1071,21 +1548,11 @@ static struct RDFfdwColumn *GetRDFColumn(struct RDFfdwState *state, char *column */ static void InitSession(struct RDFfdwState *state, RelOptInfo *baserel, PlannerInfo *root) { - ForeignTable *ft = GetForeignTable(state->foreigntableid); - ForeignServer *server = GetForeignServer(ft->serverid); List *columnlist = baserel->reltarget->exprs; List *conditions = baserel->baserestrictinfo; ListCell *cell; - int where_position = -1; - int where_size = -1; StringInfoData select; -#if PG_VERSION_NUM < 130000 - Relation rel = heap_open(ft->relid, NoLock); -#else - Relation rel = table_open(state->foreigntableid, NoLock); -#endif - elog(DEBUG1,"%s called",__func__); /* @@ -1098,172 +1565,19 @@ static void InitSession(struct RDFfdwState *state, RelOptInfo *baserel, PlannerI state->format = RDF_DEFAULT_FORMAT; state->connect_timeout = RDF_DEFAULT_CONNECTTIMEOUT; state->max_retries = RDF_DEFAULT_MAXRETRY; - state->numcols = rel->rd_att->natts; + state->ft = GetForeignTable(state->foreigntableid); + state->server = GetForeignServer(state->ft->serverid); - /* - *Loading FOREIGN TABLE strucuture (columns and their OPTION values) + /* + * Loading SERVER OPTIONS */ - state->rdfTable = (struct RDFfdwTable *) palloc0(sizeof(struct RDFfdwTable)); - state->rdfTable->cols = (struct RDFfdwColumn **) palloc0(sizeof(struct RDFfdwColumn*) * state->numcols); - - for (int i = 0; i < state->numcols; i++) - { - List *options = GetForeignColumnOptions(state->foreigntableid, i + 1); - ListCell *lc; - - state->rdfTable->cols[i] = (struct RDFfdwColumn *)palloc0(sizeof(struct RDFfdwColumn)); - - /* - * Setting foreign table colmuns's default values. - */ - state->rdfTable->cols[i]->pushable = true; - state->rdfTable->cols[i]->nodetype = RDF_COLUMN_OPTION_NODETYPE_LITERAL; - state->rdfTable->cols[i]->used = false; - - foreach (lc, options) - { - DefElem *def = (DefElem *)lfirst(lc); - - if (strcmp(def->defname, RDF_COLUMN_OPTION_VARIABLE) == 0) - { - elog(DEBUG1," %s: (%d) adding sparql variable > '%s'",__func__,i,defGetString(def)); - state->rdfTable->cols[i]->sparqlvar = pstrdup(defGetString(def)); - } - else if (strcmp(def->defname, RDF_COLUMN_OPTION_EXPRESSION) == 0) - { - elog(DEBUG1," %s: (%d) adding sparql expression > '%s'",__func__,i,defGetString(def)); - state->rdfTable->cols[i]->expression = pstrdup(defGetString(def)); - state->rdfTable->cols[i]->pushable = IsExpressionPushable(defGetString(def)); - elog(DEBUG1," %s: (%d) is expression pushable? > '%s'",__func__,i, - state->rdfTable->cols[i]->pushable ? "true" : "false"); - } - else if (strcmp(def->defname, RDF_COLUMN_OPTION_LITERALTYPE) == 0) - { - StringInfoData literaltype; - initStringInfo(&literaltype); - appendStringInfo(&literaltype, "^^%s", defGetString(def)); - elog(DEBUG1," %s: (%d) adding sparql literal data type > '%s'",__func__,i,defGetString(def)); - state->rdfTable->cols[i]->literaltype = pstrdup(literaltype.data); - } - else if (strcmp(def->defname, RDF_COLUMN_OPTION_NODETYPE) == 0) - { - elog(DEBUG1," %s: (%d) adding sparql node data type > '%s'",__func__,i,defGetString(def)); - state->rdfTable->cols[i]->nodetype = pstrdup(defGetString(def)); - } - else if (strcmp(def->defname, RDF_COLUMN_OPTION_LANGUAGE) == 0) - { - StringInfoData tag; - initStringInfo(&tag); - appendStringInfo(&tag, "@%s", defGetString(def)); - elog(DEBUG1," %s: (%d) adding literal language tag > '%s'",__func__,i,defGetString(def)); - state->rdfTable->cols[i]->language = pstrdup(tag.data); - } - } - - elog(DEBUG1," %s: (%d) adding data type > %u",__func__,i,rel->rd_att->attrs[i].atttypid); + LoadRDFServerInfo(state); - state->rdfTable->cols[i]->pgtype = rel->rd_att->attrs[i].atttypid; - state->rdfTable->cols[i]->name = pstrdup(NameStr(rel->rd_att->attrs[i].attname)); - state->rdfTable->cols[i]->pgtypmod = rel->rd_att->attrs[i].atttypmod; - state->rdfTable->cols[i]->pgattnum = rel->rd_att->attrs[i].attnum; - - if (!canHandleType(state->rdfTable->cols[i]->pgtype)) - { - ereport(ERROR, - (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("data type of '%s' not supported: %d\n", state->rdfTable->cols[i]->name, state->rdfTable->cols[i]->pgtype))); - -#if PG_VERSION_NUM < 130000 - heap_close(rel, NoLock); -#else - table_close(rel, NoLock); -#endif - } - } -#if PG_VERSION_NUM < 130000 - heap_close(rel, NoLock); -#else - table_close(rel, NoLock); -#endif - - /* Loading Foreign Server OPTIONS */ - foreach (cell, server->options) - { - DefElem *def = lfirst_node(DefElem, cell); - - if (strcmp(RDF_SERVER_OPTION_ENDPOINT, def->defname) == 0) - state->endpoint = defGetString(def); - - else if (strcmp(RDF_SERVER_OPTION_FORMAT, def->defname) == 0) - state->format = defGetString(def); - - else if (strcmp(RDF_SERVER_OPTION_CUSTOMPARAM, def->defname) == 0) - state->custom_params = defGetString(def); - - else if (strcmp(RDF_SERVER_OPTION_HTTP_PROXY, def->defname) == 0) - { - state->proxy = defGetString(def); - state->proxy_type = RDF_SERVER_OPTION_HTTP_PROXY; - } - else if (strcmp(RDF_SERVER_OPTION_HTTPS_PROXY, def->defname) == 0) - { - state->proxy = defGetString(def); - state->proxy_type = RDF_SERVER_OPTION_HTTPS_PROXY; - } - else if (strcmp(RDF_SERVER_OPTION_PROXY_USER, def->defname) == 0) - state->proxy_user = defGetString(def); - - else if (strcmp(RDF_SERVER_OPTION_PROXY_USER_PASSWORD, def->defname) == 0) - state->proxy_user_password = defGetString(def); - - else if (strcmp(RDF_SERVER_OPTION_CONNECTRETRY, def->defname) == 0) - { - char *tailpt; - char *maxretry_str = defGetString(def); - state->max_retries = strtol(maxretry_str, &tailpt, 0); - } - else if (strcmp(RDF_SERVER_OPTION_REQUEST_REDIRECT, def->defname) == 0) - state->request_redirect = defGetBoolean(def); - - else if (strcmp(RDF_SERVER_OPTION_REQUEST_MAX_REDIRECT, def->defname) == 0) - { - char *tailpt; - char *maxredirect_str = defGetString(def); - state->request_max_redirect = strtol(maxredirect_str, &tailpt, 0); - } - else if (strcmp(RDF_SERVER_OPTION_CONNECTTIMEOUT, def->defname) == 0) - { - char *tailpt; - char *timeout_str = defGetString(def); - state->connect_timeout = strtol(timeout_str, &tailpt, 0); - } - else if (strcmp(RDF_SERVER_OPTION_ENABLE_PUSHDOWN, def->defname) == 0) - state->enable_pushdown = defGetBoolean(def); - - else if (strcmp(RDF_SERVER_OPTION_QUERY_PARAM, def->defname) == 0) - state->query_param = defGetString(def); - } - - - /* - * Loading Foreign Table OPTIONS + /* + * Loading FOREIGN TABLE structure and OPTIONS */ - foreach (cell, ft->options) - { - DefElem *def = lfirst_node(DefElem, cell); - - if (strcmp(RDF_TABLE_OPTION_SPARQL, def->defname) == 0) - { - state->raw_sparql = defGetString(def); - state->is_sparql_parsable = IsSPARQLParsable(state); - } - else if (strcmp(RDF_TABLE_OPTION_LOG_SPARQL, def->defname) == 0) - state->log_sparql = defGetBoolean(def); - - else if (strcmp(RDF_TABLE_OPTION_ENABLE_PUSHDOWN, def->defname) == 0) - state->enable_pushdown = defGetBoolean(def); - } + LoadRDFTableInfo(state); /* * Marking columns used in the SQL query for SPARQL pushdown @@ -1294,26 +1608,15 @@ static void InitSession(struct RDFfdwState *state, RelOptInfo *baserel, PlannerI else if(state->rdfTable->cols[i]->used && state->rdfTable->cols[i]->expression) appendStringInfo(&select,"(%s AS %s) ",pstrdup(state->rdfTable->cols[i]->expression), - pstrdup(state->rdfTable->cols[i]->sparqlvar)); + pstrdup(state->rdfTable->cols[i]->sparqlvar)); } state->sparql_select = pstrdup(select.data); - /* - * Deparsing SPARQL WHERE clause - * 'where_position = i + 1' to remove the surrounging curly braces {} as we are - * interested only in WHERE clause's containing triples + /* + * Extract the graph patter from the SPARQL WHERE clause */ - for (int i = 0; state->raw_sparql[i] != '\0'; i++) - { - if (state->raw_sparql[i] == '{' && where_position == -1) - where_position = i + 1; - - if (state->raw_sparql[i] == '}') - where_size = i - where_position; - } - - state->sparql_where = pnstrdup(state->raw_sparql + where_position, where_size); + state->sparql_where = DeparseSPARQLWhereGraphPattern(state); /* * Try to deparse SQL WHERE conditions, if any, to create SPARQL FILTER expressions @@ -1403,7 +1706,7 @@ static int ExecuteSPARQL(RDFfdwState *state) appendStringInfo(&accept_header, "Accept: %s", state->format); if(state->log_sparql) - elog(NOTICE,"SPARQL query sent to '%s':\n\n%s\n",state->endpoint,state->sparql); + elog(INFO,"SPARQL query sent to '%s':\n\n%s\n",state->endpoint,state->sparql); initStringInfo(&url_buffer); appendStringInfo(&url_buffer, "%s=%s", state->query_param, curl_easy_escape(curl, state->sparql, 0)); @@ -2835,6 +3138,28 @@ static char *DeparseSQLWhereConditions(struct RDFfdwState *state, RelOptInfo *ba return where_clause.data; } + +static char *DeparseSPARQLWhereGraphPattern(struct RDFfdwState *state) +{ + int where_position = -1; + int where_size = -1; + /* + * Deparsing SPARQL WHERE clause + * 'where_position = i + 1' to remove the surrounging curly braces {} as we are + * interested only in WHERE clause's graph pattern + */ + for (int i = 0; state->raw_sparql[i] != '\0'; i++) + { + if (state->raw_sparql[i] == '{' && where_position == -1) + where_position = i + 1; + + if (state->raw_sparql[i] == '}') + where_size = i - where_position; + } + + return pnstrdup(state->raw_sparql + where_position, where_size); +} + /* * DeparseSQLOrderBy * -----------------