From 9e0c59b92c4a6a51cdd063d47702d68b1a880471 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 13 May 2024 18:01:48 +0800 Subject: [PATCH] update canal source connector --- .../connector/CanalSourceConnector.java | 25 +++---------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 03be76bcaf..eda956b342 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -104,36 +104,19 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup super.startEventParserInternal(parser, isGroup); if (eventParser instanceof MysqlEventParser) { - // 设置支持的类型 + // set eventParser support type ((MysqlEventParser) eventParser).setSupportBinlogFormats("ROW"); - if (syncFull) { - ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL"); - } else { - ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL,MINIMAL"); - } - + ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL"); MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser; - mysqlEventParser.setParallel(false); // otter先使用简单的模式 - CanalHAController haController = mysqlEventParser.getHaController(); + mysqlEventParser.setParallel(false); + CanalHAController haController = mysqlEventParser.getHaController(); if (!haController.isStart()) { haController.start(); } } } - }; - - CanalEventSink eventSink = instance.getEventSink(); - if (eventSink instanceof AbstractCanalEventSink) { - handler = new OtterDownStreamHandler(); - handler.setPipelineId(pipelineId); - handler.setDetectingIntervalInSeconds(canal.getCanalParameter().getDetectingIntervalInSeconds()); - OtterContextLocator.autowire(handler); // 注入一下spring资源 - ((AbstractCanalEventSink) eventSink).addHandler(handler, 0); // 添加到开头 - handler.start(); - } - return instance; } });