Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/sync): validate deltas when syncing #14127

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
cfb6a92
feat(sync): validate deltas
chobits Jan 10, 2025
f190970
add test case
chobits Jan 13, 2025
7aafdd9
localize table.insert
chobits Jan 13, 2025
5a4aa84
localize kong.db
chobits Jan 13, 2025
1b5011b
ngx.null -> null
chobits Jan 13, 2025
1a63d10
fix lint error of test case
chobits Jan 13, 2025
d31a2b6
fix typo: generate
chobits Jan 13, 2025
a19f5bc
coding style
chobits Jan 13, 2025
bf13811
add comment
chobits Jan 13, 2025
980029c
add comment for expand_foreigns
chobits Jan 13, 2025
089d82e
Update kong/db/schema/others/declarative_config.lua
chobits Jan 14, 2025
4526fd9
Update kong/db/schema/others/declarative_config.lua
chobits Jan 14, 2025
8f8de42
fix comments
chobits Jan 14, 2025
bdb40b8
select return fix
chobits Jan 14, 2025
36e4701
fixed tests: assert -> assert.is_true
chobits Jan 14, 2025
ab513d0
improve loop logic
chobits Jan 14, 2025
e0db810
improve perf of test cases
chobits Jan 14, 2025
e3494c0
generaete full schema of declarative config for sync
chobits Jan 14, 2025
1bca8c8
add blank to fix coding style
chobits Jan 14, 2025
f77ca9b
simplify logic: we dont validate full sync, all run vallidate sync
chobits Jan 14, 2025
9fec3e6
Revert "generaete full schema of declarative config for sync"
chobits Jan 14, 2025
134e2c6
1
chobits Jan 14, 2025
88992ff
fix 09-node-id-persistence_spec.lua
chobits Jan 14, 2025
bf38ec1
rename: declarative_config.validate -> declarative_config.validate_sc…
chobits Jan 14, 2025
c2c58d5
remove DeclarativeConfig.validate_references_full
chobits Jan 14, 2025
ec2e241
fix title of 09-node-id-persistence_spec.lua
chobits Jan 14, 2025
6159464
fix test case 30
chobits Jan 15, 2025
f9de4eb
add wait time
chobits Jan 15, 2025
f05309d
delcarative_config.load() supports sync.v2
chobits Jan 16, 2025
4d76b7c
delcarative_config.load() supports sync.v2 (part 2)
chobits Jan 16, 2025
d3c619b
delcarative_config.load() supports sync.v2 (full schema preload)
chobits Jan 16, 2025
5d56bb0
fix comment
chobits Jan 16, 2025
8d3ed7d
add comment for wrap_db() expand_foreigns param
chobits Jan 17, 2025
d71d10c
Update kong/db/schema/others/declarative_config.lua
chobits Jan 17, 2025
ed677e2
comment
chobits Jan 17, 2025
64015e8
coding style: not not sync -> sync ~= nil
chobits Jan 17, 2025
de9f3a5
coding style
chobits Jan 17, 2025
bfc35c3
fix 06-validate_deltas_spec.lua
chobits Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bin/busted
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ if not os.getenv("KONG_BUSTED_RESPAWNED") then
-- create shared dict
resty_flags = resty_flags .. require("spec.fixtures.shared_dict")

-- create lmdb environment
local lmdb_env = os.tmpname()
resty_flags = resty_flags .. string.format(' --main-conf "lmdb_environment_path %s;" ', lmdb_env)

if resty_flags then
table.insert(cmd, cmd_prefix_count+1, resty_flags)
end
Expand Down
1 change: 1 addition & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ build = {
["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua",
["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua",
["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua",
["kong.clustering.services.sync.validate"] = "kong/clustering/services/sync/validate.lua",
["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
Expand Down
9 changes: 8 additions & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local isempty = require("table.isempty")
local events = require("kong.runloop.events")


local validate_deltas = require("kong.clustering.services.sync.validate").validate_deltas
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
Expand Down Expand Up @@ -194,6 +195,7 @@ local function do_sync()
return nil, "default namespace does not exist inside params"
end

local wipe = ns_delta.wipe
local deltas = ns_delta.deltas

if not deltas then
Expand All @@ -219,9 +221,14 @@ local function do_sync()
end
assert(type(kong.default_workspace) == "string")

-- validate deltas
local ok, err = validate_deltas(deltas, wipe)
if not ok then
return nil, err
end

local t = txn.begin(512)

local wipe = ns_delta.wipe
if wipe then
t:db_drop(false)
end
Expand Down
61 changes: 61 additions & 0 deletions kong/clustering/services/sync/validate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
local declarative = require("kong.db.declarative")
local declarative_config = require("kong.db.schema.others.declarative_config")

chobits marked this conversation as resolved.
Show resolved Hide resolved

local null = ngx.null
local tb_insert = table.insert
local validate_schema = declarative_config.validate_schema
local pk_string = declarative_config.pk_string
local validate_references_sync = declarative_config.validate_references_sync
local pretty_print_error = declarative.pretty_print_error


local function validate_deltas(deltas, is_full_sync)

-- generate deltas table mapping primary key string to entity item
local deltas_map = {}

-- generate declarative config table
local dc_table = { _format_version = "3.0", }

local db = kong.db

for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_entity = delta.entity

if delta_entity ~= nil and delta_entity ~= null then
dc_table[delta_type] = dc_table[delta_type] or {}

tb_insert(dc_table[delta_type], delta_entity)

-- table: primary key string -> entity
local schema = db[delta_type].schema
local pk = schema:extract_pk_values(delta_entity)
local pks = pk_string(schema, pk)

deltas_map[pks] = delta_entity
end
end

-- validate schema (same logic as the sync v1 full-sync schema validation)
local dc_schema = db.declarative_config.schema

local ok, err_t = validate_schema(dc_schema, dc_table)
if not ok then
return nil, pretty_print_error(err_t)
end

-- validate references
local ok, err_t = validate_references_sync(deltas, deltas_map, is_full_sync)
if not ok then
return nil, pretty_print_error(err_t)
end

return true
end


return {
validate_deltas = validate_deltas,
}
9 changes: 7 additions & 2 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ local _MT = { __index = _M, }
-- @tparam boolean partial Input is not a full representation
-- of the database (e.g. for db_import)
-- @treturn table A Config schema adjusted for this configuration
function _M.new_config(kong_config, partial)
local schema, err = declarative_config.load(kong_config.loaded_plugins, kong_config.loaded_vaults)
function _M.new_config(kong_config, partial, include_foreign)
chobits marked this conversation as resolved.
Show resolved Hide resolved
local schema, err = declarative_config.load(kong_config.loaded_plugins,
kong_config.loaded_vaults,
include_foreign, kong.sync ~= nil)
if not schema then
return nil, err
end
Expand Down Expand Up @@ -263,5 +265,8 @@ _M.delete_entity_for_txn = declarative_import.delete_entity_for_txn
_M.workspace_id = declarative_import.workspace_id
_M.GLOBAL_WORKSPACE_TAG = declarative_import.GLOBAL_WORKSPACE_TAG

-- helpful function
_M.pretty_print_error = pretty_print_error


return _M
136 changes: 119 additions & 17 deletions kong/db/schema/others/declarative_config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local null = ngx.null
local type = type
local next = next
local pairs = pairs
local fmt = string.format
local yield = require("kong.tools.yield").yield
local ipairs = ipairs
local insert = table.insert
Expand Down Expand Up @@ -331,7 +332,7 @@ end


local function ws_id_for(item)
if item.ws_id == nil or item.ws_id == ngx.null then
if item.ws_id == nil or item.ws_id == null then
return "*"
end
return item.ws_id
Expand Down Expand Up @@ -413,7 +414,7 @@ local function populate_references(input, known_entities, by_id, by_key, expecte
local key = use_key and item[endpoint_key]

local failed = false
if key and key ~= ngx.null then
if key and key ~= null then
local ok = add_to_by_key(by_key, entity_schema, item, entity, key)
if not ok then
add_error(errs, parent_entity, parent_idx, entity, i,
Expand Down Expand Up @@ -506,6 +507,68 @@ local function validate_references(self, input)
end


function DeclarativeConfig.validate_references_sync(deltas, deltas_map, is_full_sync)
local errs = {}

for _, delta in ipairs(deltas) do
local item_type = delta.type
local item = delta.entity
local ws_id = delta.ws_id or kong.default_workspace

local foreign_refs = foreign_references[item_type]

if not item or item == null or not foreign_refs then
goto continue
end

for k, v in pairs(item) do

-- Try to check if item's some foreign key exists in the deltas or LMDB.
-- For example, `item[k]` could be `<router_entity>["service"]`, we need
-- to find the referenced foreign service entity for this router entity.

local foreign_entity = foreign_refs[k]

if foreign_entity and v ~= null then -- k is foreign key

local dao = kong.db[foreign_entity]

-- try to find it in deltas
local pks = DeclarativeConfig.pk_string(dao.schema, v)
local fvalue = deltas_map[pks]

-- try to find it in DB (LMDB)
if not fvalue and not is_full_sync then
fvalue = dao:select(v, { workspace = ws_id })
end

-- record an error if not finding its foreign reference
if not fvalue then
errs[item_type] = errs[item_type] or {}
errs[item_type][foreign_entity] = errs[item_type][foreign_entity] or {}

local msg = fmt("could not find %s's foreign refrences %s (%s)",
item_type, foreign_entity,
type(v) == "string" and v or cjson_encode(v))

insert(errs[item_type][foreign_entity], msg)
end
end -- if foreign_entity and v ~= null

end -- for k, v in pairs(item)

::continue::
end -- for _, delta in ipairs(deltas)


if next(errs) then
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
return nil, errs
end

return true
end


-- This is a best-effort generation of a cache-key-like identifier
-- to feed the hash when generating deterministic UUIDs.
-- We do not use the actual `cache_key` function from the DAO because
Expand Down Expand Up @@ -781,15 +844,7 @@ local function get_unique_key(schema, entity, field, value)
end


local function flatten(self, input)
-- manually set transform here
-- we can't do this in the schema with a `default` because validate
-- needs to happen before process_auto_fields, which
-- is the one in charge of filling out default values
if input._transform == nil then
input._transform = true
end

function DeclarativeConfig.validate_schema(self, input)
local ok, err = self:validate(input)
if not ok then
yield()
Expand All @@ -812,6 +867,24 @@ local function flatten(self, input)
yield()
end

return true
end


local function flatten(self, input)
-- manually set transform here
-- we can't do this in the schema with a `default` because validate
-- needs to happen before process_auto_fields, which
-- is the one in charge of filling out default values
if input._transform == nil then
input._transform = true
end

local ok, err = DeclarativeConfig.validate_schema(self, input)
if not ok then
return nil, err
end

generate_ids(input, self.known_entities)

yield()
Expand Down Expand Up @@ -860,7 +933,7 @@ local function flatten(self, input)

if field.unique then
local flat_value = flat_entry[name]
if flat_value and flat_value ~= ngx.null then
if flat_value and flat_value ~= null then
local unique_key = get_unique_key(schema, entry, field, flat_value)
uniques[name] = uniques[name] or {}
if uniques[name][unique_key] then
Expand Down Expand Up @@ -907,7 +980,9 @@ local function load_entity_subschemas(entity_name, entity)
end


function DeclarativeConfig.load(plugin_set, vault_set, include_foreign)
-- @tparam sync_v2_enabled It generates full schema and foreign references to
-- validate schema and references for sync.v2
function DeclarativeConfig.load(plugin_set, vault_set, include_foreign, sync_v2_enabled)
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
all_schemas = {}
local schemas_array = {}
for _, entity in ipairs(constants.CORE_ENTITIES) do
Expand Down Expand Up @@ -942,7 +1017,7 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign)
known_entities[i] = schema.name
end

local fields, records = build_fields(known_entities, include_foreign)
local fields, records = build_fields(known_entities, include_foreign or sync_v2_enabled)
-- assert(no_foreign(fields))

local ok, err = load_plugin_subschemas(fields, plugin_set)
Expand All @@ -955,10 +1030,33 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign)
return nil, err
end

-- we replace the "foreign"-type fields at the top-level
-- with "string"-type fields only after the subschemas have been loaded,
-- otherwise they will detect the mismatch.
-- Pre-load the full schema to validate the schema for sync.v2. Lazy loading
-- the full schema with DeclarativeConfig.load() will consume a lot of time
-- due to load_plugin_subschemas().
local full_schema

if sync_v2_enabled then
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
local def = {
name = "declarative_config",
primary_key = {},
-- copy fields to avoid its "foreign"-type fields from being cleared by
-- reference_foreign_by_name()
fields = kong_table.cycle_aware_deep_copy(fields, true),
}

full_schema = Schema.new(def)

full_schema.known_entities = known_entities
full_schema.flatten = flatten
full_schema.insert_default_workspace_if_not_given = insert_default_workspace_if_not_given
full_schema.plugin_set = plugin_set
full_schema.vault_set = vault_set
end

if not include_foreign then
-- we replace the "foreign"-type fields at the top-level
-- with "string"-type fields only after the subschemas have been loaded,
-- otherwise they will detect the mismatch.
reference_foreign_by_name(known_entities, records)
end

Expand All @@ -976,6 +1074,10 @@ function DeclarativeConfig.load(plugin_set, vault_set, include_foreign)
schema.plugin_set = plugin_set
schema.vault_set = vault_set

if sync_v2_enabled then
schema.full_schema = full_schema
end

return schema, nil, def
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local write_node_id = [[
local function get_http_node_id()
local client = helpers.proxy_client(nil, 9002)
finally(function() client:close() end)

helpers.wait_until(function()
local res = client:get("/request", {
headers = { host = "http.node-id.test" },
Expand Down Expand Up @@ -87,7 +88,7 @@ for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do
local rpc, rpc_sync = v[1], v[2]

for _, strategy in helpers.each_strategy() do
describe("node id persistence " .. " rpc_sync=" .. rpc_sync, function()
describe("node id persistence rpc_sync = " .. rpc_sync, function()
chobits marked this conversation as resolved.
Show resolved Hide resolved

local control_plane_config = {
role = "control_plane",
Expand Down
Loading
Loading