From bbe2df41f71d9a5d7a471d484eac25232c616c92 Mon Sep 17 00:00:00 2001 From: Ryan-Git Date: Sun, 20 Oct 2019 15:50:58 +0800 Subject: [PATCH] should fatal when get schema error (#219) --- integration_test/mysql_mysql_test.go | 72 +++++++++++++++++++++++++ pkg/inputs/mysqlstream/binlog_tailer.go | 5 +- 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/integration_test/mysql_mysql_test.go b/integration_test/mysql_mysql_test.go index 8ec66772..dc7a9665 100644 --- a/integration_test/mysql_mysql_test.go +++ b/integration_test/mysql_mysql_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/moiot/gravity/pkg/inputs" + "github.com/stretchr/testify/require" "github.com/moiot/gravity/pkg/app" @@ -125,6 +127,76 @@ func TestMySQLToMySQLStream(t *testing.T) { r.NoError(generator.TestChecksum()) } +func TestTableNotExists(t *testing.T) { + r := require.New(t) + + sourceDBName := strings.ToLower(t.Name()) + "_source" + targetDBName := strings.ToLower(t.Name()) + "_target" + + sourceDB := mysql_test.MustSetupSourceDB(sourceDBName) + defer sourceDB.Close() + targetDB := mysql_test.MustSetupTargetDB(targetDBName) + defer targetDB.Close() + + sourceDBConfig := mysql_test.SourceDBConfig() + targetDBConfig := mysql_test.TargetDBConfig() + + dbUtil := utils.NewMySQLDB(sourceDB) + binlogFilePos, gtid, err := dbUtil.GetMasterStatus() + r.NoError(err) + + pipelineConfig := config.PipelineConfigV3{ + PipelineName: t.Name(), + Version: config.PipelineConfigV3Version, + InputPlugin: config.InputConfig{ + Type: inputs.Mysql, + Mode: config.Stream, + Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{ + Source: sourceDBConfig, + StartPosition: &config.MySQLBinlogPosition{ + BinLogFileName: binlogFilePos.Name, + BinLogFilePos: binlogFilePos.Pos, + BinlogGTID: gtid.String(), + }, + }), + }, + OutputPlugin: config.GenericPluginConfig{ + Type: "mysql", + Config: utils.MustAny2Map(mysql.MySQLPluginConfig{ + DBConfig: targetDBConfig, + EnableDDL: true, + Routes: []map[string]interface{}{ + { + "match-schema": sourceDBName, + "match-table": "*", + "target-schema": targetDBName, + }, + }, + }), + }, + } + + fullTblName := fmt.Sprintf("`%s`.`t`", sourceDBName) + _, err = sourceDB.Exec(fmt.Sprintf("CREATE TABLE %s (`id` int(11) unsigned NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;", fullTblName)) + r.NoError(err) + _, err = sourceDB.Exec(fmt.Sprintf("insert into %s(id) values (1);", fullTblName)) + r.NoError(err) + _, err = sourceDB.Exec(fmt.Sprintf("drop table %s;", fullTblName)) + r.NoError(err) + + err = mysql_test.SendDeadSignal(sourceDB, pipelineConfig.PipelineName) + r.NoError(err) + + // start the server + server, err := app.NewServer(pipelineConfig) + r.NoError(err) + + r.NoError(server.Start()) + + server.Input.Wait() + server.Close() +} + func TestMySQLBatch(t *testing.T) { r := require.New(t) diff --git a/pkg/inputs/mysqlstream/binlog_tailer.go b/pkg/inputs/mysqlstream/binlog_tailer.go index c3d09e8b..43268983 100644 --- a/pkg/inputs/mysqlstream/binlog_tailer.go +++ b/pkg/inputs/mysqlstream/binlog_tailer.go @@ -317,8 +317,7 @@ func (tailer *BinlogTailer) Start() error { // TODO: introduce schema store, so that we won't have stale schema schema, err := tailer.sourceSchemaStore.GetSchema(schemaName) if err != nil { - log.Errorf("[binlogTailer] failed GetSchema %v. err: %v.", schemaName, errors.ErrorStack(err)) - continue + log.Fatalf("[binlogTailer] failed GetSchema %v. err: %v.", schemaName, errors.ErrorStack(err)) } tableDef := schema[tableName] @@ -329,7 +328,7 @@ func (tailer *BinlogTailer) Start() error { log.Fatalf("[binlogTailer] failed to get internal traffic table: schemaName: %v, tableName: %v", schemaName, tableName) } else { - log.Errorf("[binlogTailer] failed to get table def, schemaName: %v, tableName: %v", schemaName, tableName) + log.Warnf("[binlogTailer] failed to get table def, skip this mutation. schemaName: %v, tableName: %v", schemaName, tableName) continue } }