From 832ddae1c0617bc8a4764500376e9b9140367da5 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Tue, 21 May 2024 21:33:36 +0800 Subject: [PATCH] update canal connector --- .../rdb/canal/SinkConnectorConfig.java | 8 +++++ .../rdb/canal/SourceConnectorConfig.java | 6 ++++ .../connector/canal/CanalConnectRecord.java | 13 +------- .../sink/connector/CanalSinkConnector.java | 32 +++++++++++++++++-- .../connector/canal/source/EntryParser.java | 17 ++++------ .../connector/CanalSourceConnector.java | 6 +--- 6 files changed, 52 insertions(+), 30 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java index eb6435e537..c6d4a8951e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java @@ -31,7 +31,15 @@ public class SinkConnectorConfig { private String url; + private String dbAddress; + + private int dbPort; + private String userName; private String passWord; + + private String schemaName; + + private String tableName; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java index 854a34a03f..b64bf15579 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java @@ -32,6 +32,8 @@ public class SourceConnectorConfig { private String connectorName; + private String url; + private String dbAddress; private int dbPort; @@ -40,4 +42,8 @@ public class SourceConnectorConfig { private String passWord; + private String schemaName; + + private String tableName; + } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java index 62e4216254..9ceda46ae9 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java @@ -31,11 +31,6 @@ @Data public class CanalConnectRecord { - /** - * 内部维护的一套tableId,与manager中得到的table Id对应 - */ - private long tableId = -1; - private String tableName; private String schemaName; @@ -164,7 +159,6 @@ private List cloneColumn(List columns) { public CanalConnectRecord clone() { CanalConnectRecord record = new CanalConnectRecord(); - record.setTableId(tableId); record.setTableName(tableName); record.setSchemaName(schemaName); record.setDdlSchemaName(ddlSchemaName); @@ -195,7 +189,6 @@ public int hashCode() { result = prime * result + ((oldKeys == null) ? 0 : oldKeys.hashCode()); result = prime * result + (int) (pairId ^ (pairId >>> 32)); result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode()); - result = prime * result + (int) (tableId ^ (tableId >>> 32)); result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); return result; } @@ -249,9 +242,6 @@ public boolean equals(Object obj) { } else if (!schemaName.equals(other.schemaName)) { return false; } - if (tableId != other.tableId) { - return false; - } if (tableName == null) { if (other.tableName != null) { return false; @@ -265,8 +255,7 @@ public boolean equals(Object obj) { @Override public String toString() { return "CanalConnectRecord{" + - "tableId=" + tableId + - ", tableName='" + tableName + '\'' + + "tableName='" + tableName + '\'' + ", schemaName='" + schemaName + '\'' + ", eventType=" + eventType + ", executeTime=" + executeTime + diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 468e50acc9..cf16598947 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -17,9 +17,12 @@ package org.apache.eventmesh.connector.canal.sink.connector; +import javafx.fxml.LoadException; + import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; +import org.apache.eventmesh.connector.canal.CanalConnectRecord; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; @@ -44,14 +47,14 @@ public Class configClass() { @Override public void init(Config config) throws Exception { // init config for canal source connector - + this.sinkConfig = (CanalSinkConfig)config; } @Override public void init(ConnectorContext connectorContext) throws Exception { // init config for canal source connector SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; - + this.sinkConfig = (CanalSinkConfig)sinkConnectorContext.getSinkConfig(); } @Override @@ -76,11 +79,36 @@ public void stop() { @Override public void put(List sinkRecords) { + for (ConnectRecord connectRecord : sinkRecords) { + List canalConnectRecordList = (List)connectRecord.getData(); + for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) { + if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) && + sinkConfig.getSinkConnectorConfig().getTableName().equals(canalConnectRecord.getTableName())) { + + } + } + } } @Override public Sink create() { return new CanalSinkConnector(); } + + /** + * 分析整个数据,将datas划分为多个批次. ddl sql前的DML并发执行,然后串行执行ddl后,再并发执行DML + * + * @return + */ + private boolean isDdlDatas(List canalConnectRecordList) { + boolean result = false; + for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) { + result |= canalConnectRecord.getEventType().isDdl(); + if (result && !canalConnectRecord.getEventType().isDdl()) { + throw new RuntimeException("ddl/dml can't be in one batch, it's may be a bug , pls submit issues."); + } + } + return result; + } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 8141caeb48..1a1cd3d398 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -49,15 +49,6 @@ */ @Slf4j public class EntryParser { -// private DbDialectFactory dbDialectFactory; - - private static final String RETL_CLIENT_FLAG = "_SYNC"; - - private static final String compatibleMarkTable = "retl_client"; - - private static final String compatibleMarkInfoColumn = "client_info"; - - private static final String compatibleMarkIdentifierColumn = "client_identifier"; /** * 将对应canal送出来的Entry对象解析为ConnectRecord @@ -131,6 +122,12 @@ public List parse(CanalSourceConfig sourceConfig, List internParse(CanalSourceConfig sourceConfig, Entry entry) { + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + if (!schemaName.equals(sourceConfig.getSourceConnectorConfig().getSchemaName()) || !tableName.equals(sourceConfig.getSourceConnectorConfig().getTableName())) { + return null; + } + RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); @@ -142,8 +139,6 @@ private List internParse(CanalSourceConfig sourceConfig, Ent return null; } - String schemaName = entry.getHeader().getSchemaName(); - String tableName = entry.getHeader().getTableName(); EventType eventType = EventType.valueOf(rowChange.getEventType().name()); // 处理下DDL操作 diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index bb9ba1587a..4494619a6e 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -58,14 +58,10 @@ import com.alibaba.otter.canal.parse.CanalEventParser; import com.alibaba.otter.canal.parse.ha.CanalHAController; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; -import com.alibaba.otter.canal.parse.support.AuthenticationInfo; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; -import com.alibaba.otter.canal.sink.AbstractCanalEventSink; -import com.alibaba.otter.canal.sink.CanalEventSink; -import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -105,7 +101,7 @@ public void init(ConnectorContext connectorContext) throws Exception { this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig(); this.offsetStorageReader = sourceConnectorContext.getOffsetStorageReader(); // init source database connection - DatabaseConnection.sourceConfig = sourceConfig; +// DatabaseConnection.sourceConfig = sourceConfig; // DatabaseConnection.initSourceConnection(); canalServer = CanalServerWithEmbedded.instance();