diff --git a/bin/busted b/bin/busted index e59760322faf..5be3493b0568 100755 --- a/bin/busted +++ b/bin/busted @@ -64,6 +64,10 @@ if not os.getenv("KONG_BUSTED_RESPAWNED") then -- create shared dict resty_flags = resty_flags .. require("spec.fixtures.shared_dict") + -- create lmdb environment + local lmdb_env = os.tmpname() + resty_flags = resty_flags .. string.format(' --main-conf "lmdb_environment_path %s;" ', lmdb_env) + if resty_flags then table.insert(cmd, cmd_prefix_count+1, resty_flags) end diff --git a/kong-3.10.0-0.rockspec b/kong-3.10.0-0.rockspec index 31f68741f8cd..b189153bd6d9 100644 --- a/kong-3.10.0-0.rockspec +++ b/kong-3.10.0-0.rockspec @@ -101,6 +101,7 @@ build = { ["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua", ["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua", ["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua", + ["kong.clustering.services.sync.validate"] = "kong/clustering/services/sync/validate.lua", ["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua", ["kong.cluster_events"] = "kong/cluster_events/init.lua", diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 56e2a9f1d271..5a6d297b420e 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -10,6 +10,7 @@ local isempty = require("table.isempty") local events = require("kong.runloop.events") +local validate_deltas = require("kong.clustering.services.sync.validate").validate_deltas local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY @@ -194,6 +195,7 @@ local function do_sync() return nil, "default namespace does not exist inside params" end + local wipe = ns_delta.wipe local deltas = ns_delta.deltas if not deltas then @@ -219,9 +221,14 @@ local function do_sync() end assert(type(kong.default_workspace) == "string") + -- validate deltas + local ok, err = validate_deltas(deltas, wipe) + if not ok then + return nil, err + end + local t = txn.begin(512) - local wipe = ns_delta.wipe if wipe then t:db_drop(false) end diff --git a/kong/clustering/services/sync/validate.lua b/kong/clustering/services/sync/validate.lua new file mode 100644 index 000000000000..1bdc57528b6c --- /dev/null +++ b/kong/clustering/services/sync/validate.lua @@ -0,0 +1,61 @@ +local declarative = require("kong.db.declarative") +local declarative_config = require("kong.db.schema.others.declarative_config") + + +local null = ngx.null +local tb_insert = table.insert +local validate_schema = declarative_config.validate_schema +local pk_string = declarative_config.pk_string +local validate_references_sync = declarative_config.validate_references_sync +local pretty_print_error = declarative.pretty_print_error + + +local function validate_deltas(deltas, is_full_sync) + + -- generate deltas table mapping primary key string to entity item + local deltas_map = {} + + -- generate declarative config table + local dc_table = { _format_version = "3.0", } + + local db = kong.db + + for _, delta in ipairs(deltas) do + local delta_type = delta.type + local delta_entity = delta.entity + + if delta_entity ~= nil and delta_entity ~= null then + dc_table[delta_type] = dc_table[delta_type] or {} + + tb_insert(dc_table[delta_type], delta_entity) + + -- table: primary key string -> entity + local schema = db[delta_type].schema + local pk = schema:extract_pk_values(delta_entity) + local pks = pk_string(schema, pk) + + deltas_map[pks] = delta_entity + end + end + + -- validate schema (same logic as the sync v1 full-sync schema validation) + local dc_schema = db.declarative_config.schema + + local ok, err_t = validate_schema(dc_schema, dc_table) + if not ok then + return nil, pretty_print_error(err_t) + end + + -- validate references + local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync) + if not ok then + return nil, pretty_print_error(err_t) + end + + return true +end + + +return { + validate_deltas = validate_deltas, +} diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index 1f209657f0e1..293b2829f0a3 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -33,8 +33,10 @@ local _MT = { __index = _M, } -- @tparam boolean partial Input is not a full representation -- of the database (e.g. for db_import) -- @treturn table A Config schema adjusted for this configuration -function _M.new_config(kong_config, partial) - local schema, err = declarative_config.load(kong_config.loaded_plugins, kong_config.loaded_vaults) +function _M.new_config(kong_config, partial, include_foreign) + local schema, err = declarative_config.load(kong_config.loaded_plugins, + kong_config.loaded_vaults, + include_foreign, kong.sync ~= nil) if not schema then return nil, err end @@ -263,5 +265,8 @@ _M.delete_entity_for_txn = declarative_import.delete_entity_for_txn _M.workspace_id = declarative_import.workspace_id _M.GLOBAL_WORKSPACE_TAG = declarative_import.GLOBAL_WORKSPACE_TAG +-- helpful function +_M.pretty_print_error = pretty_print_error + return _M diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 5e29ffda3181..2f67a03bdda7 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -14,6 +14,7 @@ local null = ngx.null local type = type local next = next local pairs = pairs +local fmt = string.format local yield = require("kong.tools.yield").yield local ipairs = ipairs local insert = table.insert @@ -331,7 +332,7 @@ end local function ws_id_for(item) - if item.ws_id == nil or item.ws_id == ngx.null then + if item.ws_id == nil or item.ws_id == null then return "*" end return item.ws_id @@ -413,7 +414,7 @@ local function populate_references(input, known_entities, by_id, by_key, expecte local key = use_key and item[endpoint_key] local failed = false - if key and key ~= ngx.null then + if key and key ~= null then local ok = add_to_by_key(by_key, entity_schema, item, entity, key) if not ok then add_error(errs, parent_entity, parent_idx, entity, i, @@ -506,6 +507,68 @@ local function validate_references(self, input) end +function DeclarativeConfig.validate_references_sync(deltas, deltas_map, is_full_sync) + local errs = {} + + for _, delta in ipairs(deltas) do + local item_type = delta.type + local item = delta.entity + local ws_id = delta.ws_id or kong.default_workspace + + local foreign_refs = foreign_references[item_type] + + if not item or item == null or not foreign_refs then + goto continue + end + + for k, v in pairs(item) do + + -- Try to check if item's some foreign key exists in the deltas or LMDB. + -- For example, `item[k]` could be `["service"]`, we need + -- to find the referenced foreign service entity for this router entity. + + local foreign_entity = foreign_refs[k] + + if foreign_entity and v ~= null then -- k is foreign key + + local dao = kong.db[foreign_entity] + + -- try to find it in deltas + local pks = DeclarativeConfig.pk_string(dao.schema, v) + local fvalue = deltas_map[pks] + + -- try to find it in DB (LMDB) + if not fvalue and not is_full_sync then + fvalue = dao:select(v, { workspace = ws_id }) + end + + -- record an error if not finding its foreign reference + if not fvalue then + errs[item_type] = errs[item_type] or {} + errs[item_type][foreign_entity] = errs[item_type][foreign_entity] or {} + + local msg = fmt("could not find %s's foreign refrences %s (%s)", + item_type, foreign_entity, + type(v) == "string" and v or cjson_encode(v)) + + insert(errs[item_type][foreign_entity], msg) + end + end -- if foreign_entity and v ~= null + + end -- for k, v in pairs(item) + + ::continue:: + end -- for _, delta in ipairs(deltas) + + + if next(errs) then + return nil, errs + end + + return true +end + + -- This is a best-effort generation of a cache-key-like identifier -- to feed the hash when generating deterministic UUIDs. -- We do not use the actual `cache_key` function from the DAO because @@ -781,15 +844,7 @@ local function get_unique_key(schema, entity, field, value) end -local function flatten(self, input) - -- manually set transform here - -- we can't do this in the schema with a `default` because validate - -- needs to happen before process_auto_fields, which - -- is the one in charge of filling out default values - if input._transform == nil then - input._transform = true - end - +function DeclarativeConfig.validate_schema(self, input) local ok, err = self:validate(input) if not ok then yield() @@ -812,6 +867,24 @@ local function flatten(self, input) yield() end + return true +end + + +local function flatten(self, input) + -- manually set transform here + -- we can't do this in the schema with a `default` because validate + -- needs to happen before process_auto_fields, which + -- is the one in charge of filling out default values + if input._transform == nil then + input._transform = true + end + + local ok, err = DeclarativeConfig.validate_schema(self, input) + if not ok then + return nil, err + end + generate_ids(input, self.known_entities) yield() @@ -860,7 +933,7 @@ local function flatten(self, input) if field.unique then local flat_value = flat_entry[name] - if flat_value and flat_value ~= ngx.null then + if flat_value and flat_value ~= null then local unique_key = get_unique_key(schema, entry, field, flat_value) uniques[name] = uniques[name] or {} if uniques[name][unique_key] then @@ -907,7 +980,9 @@ local function load_entity_subschemas(entity_name, entity) end -function DeclarativeConfig.load(plugin_set, vault_set, include_foreign) +-- @tparam sync_v2_enabled It generates full schema and foreign references to +-- validate schema and references for sync.v2 +function DeclarativeConfig.load(plugin_set, vault_set, include_foreign, sync_v2_enabled) all_schemas = {} local schemas_array = {} for _, entity in ipairs(constants.CORE_ENTITIES) do @@ -942,7 +1017,7 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign) known_entities[i] = schema.name end - local fields, records = build_fields(known_entities, include_foreign) + local fields, records = build_fields(known_entities, include_foreign or sync_v2_enabled) -- assert(no_foreign(fields)) local ok, err = load_plugin_subschemas(fields, plugin_set) @@ -955,10 +1030,33 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign) return nil, err end - -- we replace the "foreign"-type fields at the top-level - -- with "string"-type fields only after the subschemas have been loaded, - -- otherwise they will detect the mismatch. + -- Pre-load the full schema to validate the schema for sync.v2. Lazy loading + -- the full schema with DeclarativeConfig.load() will consume a lot of time + -- due to load_plugin_subschemas(). + local full_schema + + if sync_v2_enabled then + local def = { + name = "declarative_config", + primary_key = {}, + -- copy fields to avoid its "foreign"-type fields from being cleared by + -- reference_foreign_by_name() + fields = kong_table.cycle_aware_deep_copy(fields, true), + } + + full_schema = Schema.new(def) + + full_schema.known_entities = known_entities + full_schema.flatten = flatten + full_schema.insert_default_workspace_if_not_given = insert_default_workspace_if_not_given + full_schema.plugin_set = plugin_set + full_schema.vault_set = vault_set + end + if not include_foreign then + -- we replace the "foreign"-type fields at the top-level + -- with "string"-type fields only after the subschemas have been loaded, + -- otherwise they will detect the mismatch. reference_foreign_by_name(known_entities, records) end @@ -976,6 +1074,10 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign) schema.plugin_set = plugin_set schema.vault_set = vault_set + if sync_v2_enabled then + schema.full_schema = full_schema + end + return schema, nil, def end diff --git a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua index 7b358f629202..5ef1f552d8c7 100644 --- a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua @@ -20,6 +20,7 @@ local write_node_id = [[ local function get_http_node_id() local client = helpers.proxy_client(nil, 9002) finally(function() client:close() end) + helpers.wait_until(function() local res = client:get("/request", { headers = { host = "http.node-id.test" }, @@ -87,7 +88,7 @@ for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do local rpc, rpc_sync = v[1], v[2] for _, strategy in helpers.each_strategy() do - describe("node id persistence " .. " rpc_sync=" .. rpc_sync, function() + describe("node id persistence rpc_sync = " .. rpc_sync, function() local control_plane_config = { role = "control_plane", diff --git a/spec/02-integration/18-hybrid_rpc/06-validate_deltas_spec.lua b/spec/02-integration/18-hybrid_rpc/06-validate_deltas_spec.lua new file mode 100644 index 000000000000..0653a775cf19 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/06-validate_deltas_spec.lua @@ -0,0 +1,201 @@ +local helpers = require "spec.helpers" +local txn = require "resty.lmdb.transaction" +local declarative = require "kong.db.declarative" + + +local insert_entity_for_txn = declarative.insert_entity_for_txn +local validate_deltas = require("kong.clustering.services.sync.validate").validate_deltas + + +local function lmdb_drop() + local t = txn.begin(512) + t:db_drop(false) + t:commit() +end + + +local function lmdb_insert(name, entity) + local t = txn.begin(512) + local res, err = insert_entity_for_txn(t, name, entity, nil) + if not res then + error("lmdb insert failed: " .. err) + end + + local ok, err = t:commit() + if not ok then + error("lmdb t:commit() failed: " .. err) + end +end + + +-- insert into LMDB +local function db_insert(bp, name, entity) + -- insert into dc blueprints + entity = bp[name]:insert(entity) + + -- insert into LMDB + lmdb_insert(name, entity) + + assert(kong.db[name]:select({id = entity.id})) + + return entity +end + + +-- Cache the declarative_config to avoid the overhead of repeatedly executing +-- the time-consuming chain: +-- declarative.new_config -> declarative_config.load -> load_plugin_subschemas +local cached_dc + +local function setup_bp() + -- reset lmdb + lmdb_drop() + + -- init bp / db ( true for expand_foreigns) + local bp, db = helpers.get_db_utils("off", nil, nil, nil, nil, true) + + -- init workspaces + local workspaces = require "kong.workspaces" + workspaces.upsert_default(db) + + -- init declarative config + if not cached_dc then + kong.sync = "fake sync to generate dc with sync_v2_enabled" + local err + cached_dc, err = declarative.new_config(kong.configuration) + assert(cached_dc, err) + end + + kong.db.declarative_config = cached_dc + + return bp, db +end + + +describe("[delta validations]",function() + + it("workspace id", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + local service = db_insert(bp, "services", { name = "service-001", }) + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + service = { id = service.id }, + }) + + local deltas = declarative.export_config_sync() + + for _, delta in ipairs(deltas) do + local ws_id = delta.ws_id + assert(ws_id and ws_id ~= ngx.null) + end + end) + + it("route has foreign service", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + local service = db_insert(bp, "services", { name = "service-001", }) + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + service = { id = service.id }, + }) + + local deltas = declarative.export_config_sync() + + local ok, err = validate_deltas(deltas) + assert.is_true(ok, "validate should not fail: " .. tostring(err)) + end) + + it("route has unmatched foreign service", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + -- unmatched service + service = { id = "00000000-0000-0000-0000-000000000000" }, + }) + + local deltas = declarative.export_config_sync() + local _, err = validate_deltas(deltas, false) + assert.matches( + "entry 1 of 'services': could not find routes's foreign refrences services", + err) + end) + + it("100 routes -> 1 services: matched foreign keys", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + local service = db_insert(bp, "services", { name = "service-001", }) + + for i = 1, 100 do + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + -- unmatched service + service = { id = service.id }, + }) + end + + local deltas = declarative.export_config_sync() + local ok, err = validate_deltas(deltas, false) + assert.is_true(ok, "validate should not fail: " .. tostring(err)) + end) + + it("100 routes -> 100 services: matched foreign keys", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + + for i = 1, 100 do + local service = db_insert(bp, "services", { name = "service-001", }) + + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + -- unmatched service + service = { id = service.id }, + }) + end + + local deltas = declarative.export_config_sync() + local ok, err = validate_deltas(deltas, false) + assert.is_true(ok, "validate should not fail: " .. tostring(err)) + end) + + it("100 routes: unmatched foreign service", function() + local bp = setup_bp() + + -- add entities + db_insert(bp, "workspaces", { name = "ws-001" }) + + for i = 1, 100 do + db_insert(bp, "routes", { + name = "route-001", + paths = { "/mock" }, + -- unmatched service + service = { id = "00000000-0000-0000-0000-000000000000" }, + }) + end + + local deltas = declarative.export_config_sync() + local _, err = validate_deltas(deltas, false) + for i = 1, 100 do + assert.matches( + "entry " .. i .. " of 'services': " .. + "could not find routes's foreign refrences services", + err) + end + end) +end) diff --git a/spec/fixtures/dc_blueprints.lua b/spec/fixtures/dc_blueprints.lua index 139bd9c51411..26301d8c0e0f 100644 --- a/spec/fixtures/dc_blueprints.lua +++ b/spec/fixtures/dc_blueprints.lua @@ -28,7 +28,9 @@ local function remove_nulls(tbl) end -local function wrap_db(db) +-- tparam boolean expand_foreigns expand the complete "foreign"-type fields, not +-- replacing it with "string"-type fields +local function wrap_db(db, expand_foreigns) local dc_as_db = {} local config = new_config() @@ -43,7 +45,7 @@ local function wrap_db(db) local schema = db.daos[name].schema tbl = schema:process_auto_fields(tbl, "insert") for fname, field in schema:each_field() do - if field.type == "foreign" then + if not expand_foreigns and field.type == "foreign" then tbl[fname] = type(tbl[fname]) == "table" and tbl[fname].id or nil @@ -110,8 +112,8 @@ local function wrap_db(db) end -function dc_blueprints.new(db) - local dc_as_db = wrap_db(db) +function dc_blueprints.new(db, expand_foreigns) + local dc_as_db = wrap_db(db, expand_foreigns) local save_dc = new_config() diff --git a/spec/internal/db.lua b/spec/internal/db.lua index 5659cdf72ef2..4dff46aa8d62 100644 --- a/spec/internal/db.lua +++ b/spec/internal/db.lua @@ -261,6 +261,10 @@ end -- custom plugins as loaded. -- @param vaults (optional) vault configuration to use. -- @param skip_migrations (optional) if true, migrations will not be run. +-- @param expand_foreigns (optional) If true, it will prevent converting foreign +-- keys from primary key value pairs to strings. For example, it will keep the +-- foreign key of the router entity as `service = { id = "" }` instead of +-- converting it to `service = ""`. -- @return BluePrint, DB -- @usage -- local PLUGIN_NAME = "my_fancy_plugin" @@ -277,7 +281,7 @@ end -- route = { id = route1.id }, -- config = {}, -- } -local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) +local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations, expand_foreigns) strategy = strategy or conf.database conf.database = strategy -- overwrite kong.configuration.database @@ -343,7 +347,7 @@ local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) bp = assert(Blueprints.new(db)) dcbp = nil else - bp = assert(dc_blueprints.new(db)) + bp = assert(dc_blueprints.new(db, expand_foreigns)) dcbp = bp end