diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index 0a29656bbe..c7fd5a69b4 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -11,6 +11,16 @@ public class CanalSourceConfig extends SourceConfig { private String destination; + private Long canalInstanceId; + + private String desc; + + private boolean ddlSync = true; + + private boolean filterTableError = false; + + private Long slaveId; + private Short clientId; private Integer batchSize; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java index af174a97ce..854a34a03f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java @@ -32,7 +32,9 @@ public class SourceConnectorConfig { private String connectorName; - private String url; + private String dbAddress; + + private int dbPort; private String userName; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index a076899671..03be76bcaf 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -26,13 +26,33 @@ import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import com.alibaba.otter.canal.common.CanalException; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; +import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; +import com.alibaba.otter.canal.instance.manager.model.Canal; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.ClusterMode; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.RunMode; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType; +import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode; +import com.alibaba.otter.canal.parse.CanalEventParser; +import com.alibaba.otter.canal.parse.ha.CanalHAController; +import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; +import com.alibaba.otter.canal.parse.support.AuthenticationInfo; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; +import com.alibaba.otter.canal.sink.AbstractCanalEventSink; +import com.alibaba.otter.canal.sink.CanalEventSink; import lombok.extern.slf4j.Slf4j; @@ -72,11 +92,103 @@ public void init(ConnectorContext connectorContext) throws Exception { canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { @Override public CanalInstance generate(String destination) { - return null; + Canal canal = buildCanal(sourceConfig); + + CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) { + + protected CanalHAController initHaController() { + return super.initHaController(); + } + + protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) { + super.startEventParserInternal(parser, isGroup); + + if (eventParser instanceof MysqlEventParser) { + // 设置支持的类型 + ((MysqlEventParser) eventParser).setSupportBinlogFormats("ROW"); + if (syncFull) { + ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL"); + } else { + ((MysqlEventParser) eventParser).setSupportBinlogImages("FULL,MINIMAL"); + } + + MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser; + mysqlEventParser.setParallel(false); // otter先使用简单的模式 + CanalHAController haController = mysqlEventParser.getHaController(); + + if (!haController.isStart()) { + haController.start(); + } + } + } + + }; + + CanalEventSink eventSink = instance.getEventSink(); + if (eventSink instanceof AbstractCanalEventSink) { + handler = new OtterDownStreamHandler(); + handler.setPipelineId(pipelineId); + handler.setDetectingIntervalInSeconds(canal.getCanalParameter().getDetectingIntervalInSeconds()); + OtterContextLocator.autowire(handler); // 注入一下spring资源 + ((AbstractCanalEventSink) eventSink).addHandler(handler, 0); // 添加到开头 + handler.start(); + } + + return instance; } }); } + private Canal buildCanal(CanalSourceConfig sourceConfig) { + // 设置下slaveId,保证多个piplineId下重复引用时不重复 + long slaveId = 10000;// 默认基数 + if (sourceConfig.getSlaveId() != null) { + slaveId = sourceConfig.getSlaveId(); + } + + Canal canal = new Canal(); + canal.setId(sourceConfig.getCanalInstanceId()); + canal.setName(sourceConfig.getDestination()); + canal.setDesc(sourceConfig.getDesc()); + + CanalParameter parameter = new CanalParameter(); + + parameter.setRunMode(RunMode.EMBEDDED); + parameter.setClusterMode(ClusterMode.STANDALONE); + parameter.setMetaMode(MetaMode.MEMORY); + parameter.setHaMode(HAMode.HEARTBEAT); + parameter.setIndexMode(IndexMode.MEMORY); + parameter.setStorageMode(StorageMode.MEMORY); + parameter.setMemoryStorageBufferSize(32 * 1024); + + parameter.setSourcingType(SourcingType.MYSQL); + parameter.setDbAddresses(Collections.singletonList(new InetSocketAddress(sourceConfig.getSourceConnectorConfig().getDbAddress(), + sourceConfig.getSourceConnectorConfig().getDbPort()))); + parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName()); + parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); +// parameter.setPositions(); +// Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}", +// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}") + + parameter.setSlaveId(slaveId); + + parameter.setDefaultConnectionTimeoutInSeconds(30); + parameter.setConnectionCharset("UTF-8"); + parameter.setConnectionCharsetNumber((byte) 33); + parameter.setReceiveBufferSize(8 * 1024); + parameter.setSendBufferSize(8 * 1024); + + // heartbeat detect + parameter.setDetectingEnable(false); + + parameter.setDdlIsolation(sourceConfig.isDdlSync()); + parameter.setFilterTableError(sourceConfig.isFilterTableError()); + parameter.setMemoryStorageRawEntry(false); + + canal.setCanalParameter(parameter); + return canal; + } + @Override public void start() throws Exception {