Skip to content

Commit

Permalink
support drop multiple table (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan-Git authored Aug 21, 2019
1 parent 0a3ed2c commit c187b24
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 55 deletions.
2 changes: 2 additions & 0 deletions integration_test/mysql_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,8 @@ func TestMySQLDDL(t *testing.T) {
"CREATE TABLE IF NOT EXISTS tn4 like tn3",

"create table `abc`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB",

"drop table tn3, tn4",
}

for _, ddl := range ddls {
Expand Down
81 changes: 47 additions & 34 deletions pkg/inputs/mysqlstream/binlog_tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,32 +410,63 @@ func (tailer *BinlogTailer) Start() error {
// If B created the internal txn table _gravity.gravity_txn_tags, but A does not
// invalidate the cache, the schema of _gravity.gravity_txn_tags won't be found.
//
dbName, table, ast := extractSchemaNameFromDDLQueryEvent(tailer.parser, ev)
if dbName == consts.MySQLInternalDBName {
continue
dbNames, tables, asts := extractSchemaNameFromDDLQueryEvent(tailer.parser, ev)

// emit barrier msg
barrierMsg := NewBarrierMsg(tailer.AfterMsgCommit)
if err := tailer.emitter.Emit(barrierMsg); err != nil {
log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err))
}
<-barrierMsg.Done
if err := tailer.positionCache.Flush(); err != nil {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
}

tailer.sourceSchemaStore.InvalidateSchemaCache(dbName)
for i := range dbNames {
dbName := dbNames[i]
table := tables[i]
ast := asts[i]

if tailer.cfg.IgnoreBiDirectionalData && strings.Contains(ddlSQL, consts.DDLTag) {
log.Infof("ignore internal ddl: %s", ddlSQL)
continue
}
if dbName == consts.MySQLInternalDBName {
continue
}

if dbName == consts.GravityDBName || dbName == consts.OldDrcDBName {
continue
}
tailer.sourceSchemaStore.InvalidateSchemaCache(dbName)

log.Infof("QueryEvent: database: %s, sql: %s", dbName, ddlSQL)
if tailer.cfg.IgnoreBiDirectionalData && strings.Contains(ddlSQL, consts.DDLTag) {
log.Infof("ignore internal ddl: %s", ddlSQL)
continue
}

if tailer.binlogEventSchemaFilter != nil {
if !tailer.binlogEventSchemaFilter(dbName) {
if dbName == consts.GravityDBName || dbName == consts.OldDrcDBName {
continue
}

log.Infof("QueryEvent: database: %s, sql: %s", dbName, ddlSQL)

if tailer.binlogEventSchemaFilter != nil {
if !tailer.binlogEventSchemaFilter(dbName) {
continue
}
}

// emit ddl msg
ddlMsg := NewDDLMsg(
tailer.AfterMsgCommit,
dbName,
table,
ast,
ddlSQL,
int64(e.Header.Timestamp),
received)
if err := tailer.emitter.Emit(ddlMsg); err != nil {
log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err))
}
}

// emit barrier msg
barrierMsg := NewBarrierMsg(tailer.AfterMsgCommit)
barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit)
barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition}
if err := tailer.emitter.Emit(barrierMsg); err != nil {
log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err))
}
Expand All @@ -444,24 +475,6 @@ func (tailer *BinlogTailer) Start() error {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
}

// emit ddl msg
ddlMsg := NewDDLMsg(
tailer.AfterMsgCommit,
dbName,
table,
ast,
ddlSQL,
int64(e.Header.Timestamp),
received,
currentPosition)
if err := tailer.emitter.Emit(ddlMsg); err != nil {
log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err))
}
<-ddlMsg.Done
if err := tailer.positionCache.Flush(); err != nil {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
}

log.Infof("[binlogTailer] ddl done with gtid: %v", ev.GSet.String())
case *replication.GTIDEvent:
// GTID stands for Global Transaction IDentifier
Expand Down Expand Up @@ -536,7 +549,7 @@ func (tailer *BinlogTailer) Start() error {

func (tailer *BinlogTailer) AfterMsgCommit(msg *core.Msg) error {
ctx := msg.InputContext.(inputContext)
if ctx.op == xid || ctx.op == ddl {
if (ctx.op == xid || ctx.op == ddl) && ctx.position.BinlogGTID != "" {

if err := UpdateCurrentPositionValue(tailer.positionCache, ctx.position); err != nil {
return errors.Trace(err)
Expand Down
5 changes: 2 additions & 3 deletions pkg/inputs/mysqlstream/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ func NewDDLMsg(
ast ast.StmtNode,
ddlSQL string,
ts int64,
received time.Time,
position config.MySQLBinlogPosition) *core.Msg {
received time.Time) *core.Msg {

return &core.Msg{
Phase: core.Phase{
Expand All @@ -354,7 +353,7 @@ func NewDDLMsg(
Table: table,
DdlMsg: &core.DDLMsg{Statement: ddlSQL, AST: ast},
Done: make(chan struct{}),
InputContext: inputContext{op: ddl, position: position},
InputContext: inputContext{op: ddl},
InputStreamKey: utils.NewStringPtr(inputStreamKey),
AfterCommitCallback: callback,
}
Expand Down
48 changes: 30 additions & 18 deletions pkg/inputs/mysqlstream/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,50 @@ func IsEventBelongsToMyself(event *replication.RowsEvent, pipelineName string) b
panic("type conversion failed for internal table")
}

func extractSchemaNameFromDDLQueryEvent(p *parser.Parser, ev *replication.QueryEvent) (db, table string, node ast.StmtNode) {
func extractSchemaNameFromDDLQueryEvent(p *parser.Parser, ev *replication.QueryEvent) (db, table []string, node []ast.StmtNode) {
stmt, err := p.ParseOneStmt(string(ev.Query), "", "")
if err != nil {
log.Errorf("sql parser: %s. error: %v", string(ev.Query), err.Error())
return string(ev.Schema), "", nil
return []string{string(ev.Schema)}, []string{""}, nil
}

node = stmt

switch v := stmt.(type) {
case *ast.CreateDatabaseStmt:
db = v.Name
db = append(db, v.Name)
table = append(table, "")
node = append(node, stmt)
case *ast.DropDatabaseStmt:
db = v.Name
db = append(db, v.Name)
table = append(table, "")
node = append(node, stmt)
case *ast.CreateTableStmt:
db = v.Table.Schema.String()
table = v.Table.Name.String()
db = append(db, v.Table.Schema.String())
table = append(table, v.Table.Name.String())
node = append(node, stmt)
case *ast.DropTableStmt:
if len(v.Tables) > 1 {
log.Fatalf("only support single drop table right now: %v", string(ev.Query))
for i := range v.Tables {
db = append(db, v.Tables[i].Schema.String())
table = append(table, v.Tables[i].Name.String())
copy := *v
copy.Tables = nil
copy.Tables = append(copy.Tables, v.Tables[i])
node = append(node, &copy)
}
db = v.Tables[0].Schema.String()
table = v.Tables[0].Name.String()
case *ast.AlterTableStmt:
db = v.Table.Schema.String()
table = v.Table.Name.String()
db = append(db, v.Table.Schema.String())
table = append(table, v.Table.Name.String())
node = append(node, stmt)
case *ast.TruncateTableStmt:
db = v.Table.Schema.String()
table = v.Table.Name.String()
db = append(db, v.Table.Schema.String())
table = append(table, v.Table.Name.String())
node = append(node, stmt)
default:
db = append(db, "")
table = append(table, "")
node = append(node, stmt)
}
if db == "" {
db = string(ev.Schema)
if len(db) == 1 && db[0] == "" {
db[0] = string(ev.Schema)
}
return
}

0 comments on commit c187b24

Please sign in to comment.