From d6a14f92f9e1feb7a821a3f5a15bd07f806e020b Mon Sep 17 00:00:00 2001 From: Ryan-Git Date: Thu, 21 Nov 2019 10:56:00 +0800 Subject: [PATCH] fix bidirectional drop (#230) * fix bidirectional drop --- integration_test/mysql_mysql_test.go | 14 +++- pkg/outputs/mysql/mysql.go | 64 ++++++++++++++++--- .../batch_table_scheduler.go | 11 +++- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/integration_test/mysql_mysql_test.go b/integration_test/mysql_mysql_test.go index d01b6638..1cfd15a4 100644 --- a/integration_test/mysql_mysql_test.go +++ b/integration_test/mysql_mysql_test.go @@ -1291,20 +1291,30 @@ func TestMySQLTagDDL(t *testing.T) { r.NoError(server.Start()) tbl := "abc" + tbl2 := "abcd" _, err = sourceDB.Exec(fmt.Sprintf("%screate table `%s`.`%s`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB", consts.DDLTag, sourceDBName, tbl)) r.NoError(err) + _, err = sourceDB.Exec(fmt.Sprintf("create table `%s`.`%s`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB", sourceDBName, tbl2)) + r.NoError(err) + _, err = sourceDB.Exec(fmt.Sprintf("%sdrop table `%s`.`%s`;", consts.DDLTag, sourceDBName, tbl)) + r.NoError(err) + _, err = sourceDB.Exec(fmt.Sprintf("%sdrop table `%s`.`%s`;", consts.DDLTag, sourceDBName, tbl2)) + r.NoError(err) err = mysql_test.SendDeadSignal(sourceDB, pipelineConfig.PipelineName) r.NoError(err) <-server.Input.Done() - server.Close() row := targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = '%s'", targetDBName, tbl)) var tblName string err = row.Scan(&tblName) - r.Equal(sql.ErrNoRows, err) + r.Equal(sql.ErrNoRows, err) // create ignored by gravity + + row = targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = '%s'", targetDBName, tbl2)) + err = row.Scan(&tblName) + r.Equal(sql.ErrNoRows, err) // mysql ignores annotation in drop stmt, it will be executed } func TestMySQLDDL(t *testing.T) { diff --git a/pkg/outputs/mysql/mysql.go b/pkg/outputs/mysql/mysql.go index bb3b129b..f1043a76 100644 --- a/pkg/outputs/mysql/mysql.go +++ b/pkg/outputs/mysql/mysql.go @@ -4,6 +4,8 @@ import ( "database/sql" "fmt" "strings" + "sync" + "time" mysqldriver "github.com/go-sql-driver/mysql" "github.com/juju/errors" @@ -46,8 +48,14 @@ type MySQLOutput struct { sqlExecutor sql_execution_engine.EngineExecutor tableConfigs []config.TableConfig isTiDB bool + + // MySQL ignores comment in drop table stmt in some versions, see https://bugs.mysql.com/bug.php?id=87852 + // to prevent endless bidirectional drop tables, we keep the recent dropped table names + droppedTable sync.Map } +const keepDropTableSeconds = 30 + func init() { registry.RegisterPlugin(registry.OutputPlugin, Name, &MySQLOutput{}, false) } @@ -155,6 +163,36 @@ func (output *MySQLOutput) route0(s, t string) (schema, table string) { return } +func (output *MySQLOutput) markTableDropped(schema, table string) { + output.droppedTable.Store(utils.TableIdentity(schema, table), time.Now()) + output.cleanupDroppedTable() +} + +func (output *MySQLOutput) markTableCreated(schema, table string) { + output.droppedTable.Delete(utils.TableIdentity(schema, table)) + output.cleanupDroppedTable() +} + +func (output *MySQLOutput) hasDropped(schema, table string) bool { + _, ok := output.droppedTable.Load(utils.TableIdentity(schema, table)) + return ok +} + +func (output *MySQLOutput) cleanupDroppedTable() { + now := time.Now() + var toDelete []string + output.droppedTable.Range(func(key, value interface{}) bool { + if now.Sub(value.(time.Time)).Seconds() > keepDropTableSeconds { + toDelete = append(toDelete, key.(string)) + } + return true + }) + + for _, k := range toDelete { + output.droppedTable.Delete(k) + } +} + func toTableName(s, t string) *ast.TableName { return &ast.TableName{ Schema: model.CIStr{ @@ -247,19 +285,25 @@ func (output *MySQLOutput) Execute(msgs []*core.Msg) error { log.Info("[output-mysql] executed ddl: ", stmt) metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "create-table").Add(1) output.targetSchemaStore.InvalidateSchemaCache(targetSchema) + output.markTableCreated(msg.Database, msg.Table) case *ast.DropTableStmt: - tmp := *node - tmp.Tables[0] = toTableName(targetSchema, targetTable) - tmp.IfExists = true - stmt := restore(&tmp) - err := output.executeDDL(targetSchema, stmt) - if err != nil { - log.Fatal("[output-mysql] error exec ddl: ", stmt, ". err:", err) + if !output.hasDropped(msg.Database, msg.Table) { + tmp := *node + tmp.Tables[0] = toTableName(targetSchema, targetTable) + tmp.IfExists = true + stmt := restore(&tmp) + err := output.executeDDL(targetSchema, stmt) + if err != nil { + log.Fatal("[output-mysql] error exec ddl: ", stmt, ". err:", err) + } + log.Info("[output-mysql] executed ddl: ", stmt) + metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "drop-table").Add(1) + output.targetSchemaStore.InvalidateSchemaCache(targetSchema) + output.markTableDropped(msg.Database, msg.Table) + } else { + log.Warnf("table %s has been dropped recently. This might be a bidirectional stmt, ignore", utils.TableIdentity(msg.Database, msg.Table)) } - log.Info("[output-mysql] executed ddl: ", stmt) - metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "drop-table").Add(1) - output.targetSchemaStore.InvalidateSchemaCache(targetSchema) case *ast.AlterTableStmt: var targetDDLs []string diff --git a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go index bf64a32a..cddcda0e 100644 --- a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go +++ b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go @@ -488,9 +488,14 @@ func (scheduler *batchScheduler) startTableDispatcher(tableKey string) { } if len(curBatch) > 0 { - queueIdx := round % uint(scheduler.cfg.NrWorker) - round++ - scheduler.workerQueues[queueIdx] <- curBatch + if curBatch[0].Type == core.MsgDDL { + ddlIdx := utils.GenHashKey(utils.TableIdentity(curBatch[0].Database, curBatch[0].Table)) % uint32(scheduler.cfg.NrWorker) + scheduler.workerQueues[ddlIdx] <- curBatch + } else { + queueIdx := round % uint(scheduler.cfg.NrWorker) + round++ + scheduler.workerQueues[queueIdx] <- curBatch + } } // delete the delivered messages