Skip to content

Commit

Permalink
expose syncer.HeartbeatPeriod to config (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan-Git authored and ming535 committed Nov 21, 2019
1 parent eb69a70 commit 0d71dcb
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/inputs/helper/binlog_checker/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestBinlogChecker(t *testing.T) {
assert.FailNow(errors.ErrorStack(err))
}

syncer := utils.NewBinlogSyncer(1234, dbConfig)
syncer := utils.NewBinlogSyncer(1234, dbConfig, 0)

dbUtil := utils.NewMySQLDB(db)
_, gtid, err := dbUtil.GetMasterStatus()
Expand Down
4 changes: 2 additions & 2 deletions pkg/inputs/mysqlstream/binlog_tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewBinlogTailer(
parser: parser.New(),
ctx: c,
cancel: cancel,
binlogSyncer: utils.NewBinlogSyncer(gravityServerID, cfg.Source),
binlogSyncer: utils.NewBinlogSyncer(gravityServerID, cfg.Source, cfg.HeartbeatPeriod),
emitter: emitter,
router: router,
positionCache: positionCache,
Expand Down Expand Up @@ -686,7 +686,7 @@ func (tailer *BinlogTailer) getBinlogStreamer(gtid string) (*replication.BinlogS

func (tailer *BinlogTailer) reopenBinlogSyncer(gtidString string) (*replication.BinlogStreamer, error) {
tailer.binlogSyncer.Close()
tailer.binlogSyncer = utils.NewBinlogSyncer(tailer.gravityServerID, tailer.cfg.Source)
tailer.binlogSyncer = utils.NewBinlogSyncer(tailer.gravityServerID, tailer.cfg.Source, tailer.cfg.HeartbeatPeriod)
return tailer.getBinlogStreamer(gtidString)
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/inputs/mysqlstream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type MySQLBinlogInputPluginConfig struct {
// If we detect any internal txn tag that matches FailOnTxnTag, just fail.
FailOnTxnTags []string `mapstructure:"fail-on-txn-tags" toml:"fail-on-txn-tags"`

HeartbeatPeriodStr string `toml:"heartbeat-period" json:"heartbeat-period" mapstructure:"heartbeat-period"`
HeartbeatPeriod time.Duration `toml:"-" json:"-" mapstructure:"-"`

//
// internal configurations that is not exposed to users
//
Expand Down Expand Up @@ -101,6 +104,13 @@ func (plugin *mysqlStreamInputPlugin) Configure(pipelineName string, configInput
cfg.FailOnTxnTags = []string{fmt.Sprintf("%s*", pipelineName)}
}

if cfg.HeartbeatPeriodStr != "" {
cfg.HeartbeatPeriod, err = time.ParseDuration(cfg.HeartbeatPeriodStr)
if err != nil {
return errors.Annotatef(err, "invalid HeartbeatPeriodStr %s", cfg.HeartbeatPeriodStr)
}
}

// probe connection settings
plugin.probeDBConfig, plugin.probeSQLAnnotation = helper.GetProbCfg(cfg.SourceProbeCfg, cfg.Source)

Expand Down
7 changes: 6 additions & 1 deletion pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func SQLWithAnnotation(annotation string, sql string) string {
return fmt.Sprintf("%s%s", annotation, sql)
}

func NewBinlogSyncer(serverID uint32, dbConfig *config.DBConfig) *replication.BinlogSyncer {
func NewBinlogSyncer(serverID uint32, dbConfig *config.DBConfig, heartbeatPeriod time.Duration) *replication.BinlogSyncer {
syncerConfig := replication.BinlogSyncerConfig{
ServerID: serverID,
Flavor: "mysql",
Expand All @@ -618,6 +618,11 @@ func NewBinlogSyncer(serverID uint32, dbConfig *config.DBConfig) *replication.Bi
Password: dbConfig.Password,
ParseTime: true,
}

if heartbeatPeriod > 0 {
syncerConfig.HeartbeatPeriod = heartbeatPeriod
}

return replication.NewBinlogSyncer(syncerConfig)
}

Expand Down

0 comments on commit 0d71dcb

Please sign in to comment.