diff --git a/integration_test/mysql_mysql_test.go b/integration_test/mysql_mysql_test.go index 545252f6..18a12621 100644 --- a/integration_test/mysql_mysql_test.go +++ b/integration_test/mysql_mysql_test.go @@ -1222,6 +1222,8 @@ func TestMySQLDDL(t *testing.T) { "CREATE TABLE IF NOT EXISTS tn4 like tn3", "create table `abc`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB", + + "drop table tn3, tn4", } for _, ddl := range ddls { diff --git a/pkg/inputs/mysqlstream/binlog_tailer.go b/pkg/inputs/mysqlstream/binlog_tailer.go index cbc4ebe0..ef094b66 100644 --- a/pkg/inputs/mysqlstream/binlog_tailer.go +++ b/pkg/inputs/mysqlstream/binlog_tailer.go @@ -410,32 +410,63 @@ func (tailer *BinlogTailer) Start() error { // If B created the internal txn table _gravity.gravity_txn_tags, but A does not // invalidate the cache, the schema of _gravity.gravity_txn_tags won't be found. // - dbName, table, ast := extractSchemaNameFromDDLQueryEvent(tailer.parser, ev) - if dbName == consts.MySQLInternalDBName { - continue + dbNames, tables, asts := extractSchemaNameFromDDLQueryEvent(tailer.parser, ev) + + // emit barrier msg + barrierMsg := NewBarrierMsg(tailer.AfterMsgCommit) + if err := tailer.emitter.Emit(barrierMsg); err != nil { + log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) + } + <-barrierMsg.Done + if err := tailer.positionCache.Flush(); err != nil { + log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) } - tailer.sourceSchemaStore.InvalidateSchemaCache(dbName) + for i := range dbNames { + dbName := dbNames[i] + table := tables[i] + ast := asts[i] - if tailer.cfg.IgnoreBiDirectionalData && strings.Contains(ddlSQL, consts.DDLTag) { - log.Infof("ignore internal ddl: %s", ddlSQL) - continue - } + if dbName == consts.MySQLInternalDBName { + continue + } - if dbName == consts.GravityDBName || dbName == consts.OldDrcDBName { - continue - } + tailer.sourceSchemaStore.InvalidateSchemaCache(dbName) - log.Infof("QueryEvent: database: %s, sql: %s", dbName, ddlSQL) + if tailer.cfg.IgnoreBiDirectionalData && strings.Contains(ddlSQL, consts.DDLTag) { + log.Infof("ignore internal ddl: %s", ddlSQL) + continue + } - if tailer.binlogEventSchemaFilter != nil { - if !tailer.binlogEventSchemaFilter(dbName) { + if dbName == consts.GravityDBName || dbName == consts.OldDrcDBName { continue } + + log.Infof("QueryEvent: database: %s, sql: %s", dbName, ddlSQL) + + if tailer.binlogEventSchemaFilter != nil { + if !tailer.binlogEventSchemaFilter(dbName) { + continue + } + } + + // emit ddl msg + ddlMsg := NewDDLMsg( + tailer.AfterMsgCommit, + dbName, + table, + ast, + ddlSQL, + int64(e.Header.Timestamp), + received) + if err := tailer.emitter.Emit(ddlMsg); err != nil { + log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err)) + } } // emit barrier msg - barrierMsg := NewBarrierMsg(tailer.AfterMsgCommit) + barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit) + barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition} if err := tailer.emitter.Emit(barrierMsg); err != nil { log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) } @@ -444,24 +475,6 @@ func (tailer *BinlogTailer) Start() error { log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) } - // emit ddl msg - ddlMsg := NewDDLMsg( - tailer.AfterMsgCommit, - dbName, - table, - ast, - ddlSQL, - int64(e.Header.Timestamp), - received, - currentPosition) - if err := tailer.emitter.Emit(ddlMsg); err != nil { - log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err)) - } - <-ddlMsg.Done - if err := tailer.positionCache.Flush(); err != nil { - log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) - } - log.Infof("[binlogTailer] ddl done with gtid: %v", ev.GSet.String()) case *replication.GTIDEvent: // GTID stands for Global Transaction IDentifier @@ -536,7 +549,7 @@ func (tailer *BinlogTailer) Start() error { func (tailer *BinlogTailer) AfterMsgCommit(msg *core.Msg) error { ctx := msg.InputContext.(inputContext) - if ctx.op == xid || ctx.op == ddl { + if (ctx.op == xid || ctx.op == ddl) && ctx.position.BinlogGTID != "" { if err := UpdateCurrentPositionValue(tailer.positionCache, ctx.position); err != nil { return errors.Trace(err) diff --git a/pkg/inputs/mysqlstream/msg.go b/pkg/inputs/mysqlstream/msg.go index 253a5519..b8d5ab03 100644 --- a/pkg/inputs/mysqlstream/msg.go +++ b/pkg/inputs/mysqlstream/msg.go @@ -341,8 +341,7 @@ func NewDDLMsg( ast ast.StmtNode, ddlSQL string, ts int64, - received time.Time, - position config.MySQLBinlogPosition) *core.Msg { + received time.Time) *core.Msg { return &core.Msg{ Phase: core.Phase{ @@ -354,7 +353,7 @@ func NewDDLMsg( Table: table, DdlMsg: &core.DDLMsg{Statement: ddlSQL, AST: ast}, Done: make(chan struct{}), - InputContext: inputContext{op: ddl, position: position}, + InputContext: inputContext{op: ddl}, InputStreamKey: utils.NewStringPtr(inputStreamKey), AfterCommitCallback: callback, } diff --git a/pkg/inputs/mysqlstream/utils.go b/pkg/inputs/mysqlstream/utils.go index 3207d107..a1e1ac67 100644 --- a/pkg/inputs/mysqlstream/utils.go +++ b/pkg/inputs/mysqlstream/utils.go @@ -18,38 +18,50 @@ func IsEventBelongsToMyself(event *replication.RowsEvent, pipelineName string) b panic("type conversion failed for internal table") } -func extractSchemaNameFromDDLQueryEvent(p *parser.Parser, ev *replication.QueryEvent) (db, table string, node ast.StmtNode) { +func extractSchemaNameFromDDLQueryEvent(p *parser.Parser, ev *replication.QueryEvent) (db, table []string, node []ast.StmtNode) { stmt, err := p.ParseOneStmt(string(ev.Query), "", "") if err != nil { log.Errorf("sql parser: %s. error: %v", string(ev.Query), err.Error()) - return string(ev.Schema), "", nil + return []string{string(ev.Schema)}, []string{""}, nil } - node = stmt - switch v := stmt.(type) { case *ast.CreateDatabaseStmt: - db = v.Name + db = append(db, v.Name) + table = append(table, "") + node = append(node, stmt) case *ast.DropDatabaseStmt: - db = v.Name + db = append(db, v.Name) + table = append(table, "") + node = append(node, stmt) case *ast.CreateTableStmt: - db = v.Table.Schema.String() - table = v.Table.Name.String() + db = append(db, v.Table.Schema.String()) + table = append(table, v.Table.Name.String()) + node = append(node, stmt) case *ast.DropTableStmt: - if len(v.Tables) > 1 { - log.Fatalf("only support single drop table right now: %v", string(ev.Query)) + for i := range v.Tables { + db = append(db, v.Tables[i].Schema.String()) + table = append(table, v.Tables[i].Name.String()) + copy := *v + copy.Tables = nil + copy.Tables = append(copy.Tables, v.Tables[i]) + node = append(node, ©) } - db = v.Tables[0].Schema.String() - table = v.Tables[0].Name.String() case *ast.AlterTableStmt: - db = v.Table.Schema.String() - table = v.Table.Name.String() + db = append(db, v.Table.Schema.String()) + table = append(table, v.Table.Name.String()) + node = append(node, stmt) case *ast.TruncateTableStmt: - db = v.Table.Schema.String() - table = v.Table.Name.String() + db = append(db, v.Table.Schema.String()) + table = append(table, v.Table.Name.String()) + node = append(node, stmt) + default: + db = append(db, "") + table = append(table, "") + node = append(node, stmt) } - if db == "" { - db = string(ev.Schema) + if len(db) == 1 && db[0] == "" { + db[0] = string(ev.Schema) } return }