Skip to content

Commit

Permalink
update canal source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 11, 2024
1 parent 023938f commit 1f23a4b
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ public class CanalSourceConfig extends SourceConfig {

private String destination;

private Long canalInstanceId;

private String desc;

private boolean ddlSync = true;

private boolean filterTableError = false;

private Long slaveId;

private Short clientId;

private Integer batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public class SourceConnectorConfig {

private String connectorName;

private String url;
private String dbAddress;

private int dbPort;

private String userName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,33 @@
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;


import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.ClusterMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.RunMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventSink;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -72,11 +92,103 @@ public void init(ConnectorContext connectorContext) throws Exception {
canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
@Override
public CanalInstance generate(String destination) {
return null;
Canal canal = buildCanal(sourceConfig);

CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {

protected CanalHAController initHaController() {
return super.initHaController();
}

protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
super.startEventParserInternal(parser, isGroup);

if (eventParser instanceof MysqlEventParser) {
// 设置支持的类型
((MysqlEventParser) eventParser).setSupportBinlogFormats("ROW");
if (syncFull) {
((MysqlEventParser) eventParser).setSupportBinlogImages("FULL");
} else {
((MysqlEventParser) eventParser).setSupportBinlogImages("FULL,MINIMAL");
}

MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
mysqlEventParser.setParallel(false); // otter先使用简单的模式
CanalHAController haController = mysqlEventParser.getHaController();

if (!haController.isStart()) {
haController.start();
}
}
}

};

CanalEventSink eventSink = instance.getEventSink();
if (eventSink instanceof AbstractCanalEventSink) {
handler = new OtterDownStreamHandler();
handler.setPipelineId(pipelineId);
handler.setDetectingIntervalInSeconds(canal.getCanalParameter().getDetectingIntervalInSeconds());
OtterContextLocator.autowire(handler); // 注入一下spring资源
((AbstractCanalEventSink) eventSink).addHandler(handler, 0); // 添加到开头
handler.start();
}

return instance;
}
});
}

private Canal buildCanal(CanalSourceConfig sourceConfig) {
// 设置下slaveId,保证多个piplineId下重复引用时不重复
long slaveId = 10000;// 默认基数
if (sourceConfig.getSlaveId() != null) {
slaveId = sourceConfig.getSlaveId();
}

Canal canal = new Canal();
canal.setId(sourceConfig.getCanalInstanceId());
canal.setName(sourceConfig.getDestination());
canal.setDesc(sourceConfig.getDesc());

CanalParameter parameter = new CanalParameter();

parameter.setRunMode(RunMode.EMBEDDED);
parameter.setClusterMode(ClusterMode.STANDALONE);
parameter.setMetaMode(MetaMode.MEMORY);
parameter.setHaMode(HAMode.HEARTBEAT);
parameter.setIndexMode(IndexMode.MEMORY);
parameter.setStorageMode(StorageMode.MEMORY);
parameter.setMemoryStorageBufferSize(32 * 1024);

parameter.setSourcingType(SourcingType.MYSQL);
parameter.setDbAddresses(Collections.singletonList(new InetSocketAddress(sourceConfig.getSourceConnectorConfig().getDbAddress(),
sourceConfig.getSourceConnectorConfig().getDbPort())));
parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName());
parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
// parameter.setPositions();
// Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")

parameter.setSlaveId(slaveId);

parameter.setDefaultConnectionTimeoutInSeconds(30);
parameter.setConnectionCharset("UTF-8");
parameter.setConnectionCharsetNumber((byte) 33);
parameter.setReceiveBufferSize(8 * 1024);
parameter.setSendBufferSize(8 * 1024);

// heartbeat detect
parameter.setDetectingEnable(false);

parameter.setDdlIsolation(sourceConfig.isDdlSync());
parameter.setFilterTableError(sourceConfig.isFilterTableError());
parameter.setMemoryStorageRawEntry(false);

canal.setCanalParameter(parameter);
return canal;
}


@Override
public void start() throws Exception {
Expand Down

0 comments on commit 1f23a4b

Please sign in to comment.