Skip to content

Commit

Permalink
update canal source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 13, 2024
1 parent 1f23a4b commit 9e0c59b
Showing 1 changed file with 4 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,19 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup
super.startEventParserInternal(parser, isGroup);

if (eventParser instanceof MysqlEventParser) {
// 设置支持的类型
// set eventParser support type
((MysqlEventParser) eventParser).setSupportBinlogFormats("ROW");
if (syncFull) {
((MysqlEventParser) eventParser).setSupportBinlogImages("FULL");
} else {
((MysqlEventParser) eventParser).setSupportBinlogImages("FULL,MINIMAL");
}

((MysqlEventParser) eventParser).setSupportBinlogImages("FULL");
MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
mysqlEventParser.setParallel(false); // otter先使用简单的模式
CanalHAController haController = mysqlEventParser.getHaController();
mysqlEventParser.setParallel(false);

CanalHAController haController = mysqlEventParser.getHaController();
if (!haController.isStart()) {
haController.start();
}
}
}

};

CanalEventSink eventSink = instance.getEventSink();
if (eventSink instanceof AbstractCanalEventSink) {
handler = new OtterDownStreamHandler();
handler.setPipelineId(pipelineId);
handler.setDetectingIntervalInSeconds(canal.getCanalParameter().getDetectingIntervalInSeconds());
OtterContextLocator.autowire(handler); // 注入一下spring资源
((AbstractCanalEventSink) eventSink).addHandler(handler, 0); // 添加到开头
handler.start();
}

return instance;
}
});
Expand Down

0 comments on commit 9e0c59b

Please sign in to comment.