From 76ead29516c4975730110b51e5f794bbaeb960f4 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Thu, 16 May 2024 11:08:16 +0800 Subject: [PATCH] update canal source connector --- .../rdb/canal/CanalSourceConfig.java | 9 +- .../common/remote/offset/RecordPartition.java | 9 -- .../connector/CanalSourceConnector.java | 112 +++++++++++++++++- .../api/data/canal/CanalRecordOffset.java | 41 +++++++ .../api/data/canal/CanalRecordPartition.java | 60 ++++++++++ .../data/rocketmq/RocketMQRecordOffset.java | 4 - .../rocketmq/RocketMQRecordPartition.java | 5 - 7 files changed, 216 insertions(+), 24 deletions(-) create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordOffset.java create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordPartition.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index c7fd5a69b4..4cbf9c1817 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -1,6 +1,9 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.List; import lombok.Data; import lombok.EqualsAndHashCode; @@ -23,9 +26,11 @@ public class CanalSourceConfig extends SourceConfig { private Short clientId; - private Integer batchSize; + private Integer batchSize = 10000; + + private Long batchTimeout = -1L; - private Long batchTimeout; + private List recordPositions; private SourceConnectorConfig sourceConnectorConfig; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java index a6eb1a3d27..ec9ffb35f1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java @@ -23,20 +23,11 @@ public class RecordPartition { -// private Map partitionMap = new HashMap<>(); - private Class clazz; public RecordPartition() { } -// public RecordPartition(Map partition) { -// this.partitionMap = partition; -// } -// -// public Map getPartitionMap() { -// return partitionMap; -// } public Class getRecordPartitionClass() { return RecordPartition.class; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 0adcbbb763..f56c8cca06 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -19,20 +19,31 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.springframework.util.CollectionUtils; + import com.alibaba.otter.canal.common.CanalException; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; @@ -50,10 +61,14 @@ import com.alibaba.otter.canal.parse.ha.CanalHAController; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; import com.alibaba.otter.canal.parse.support.AuthenticationInfo; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; import com.alibaba.otter.canal.sink.AbstractCanalEventSink; import com.alibaba.otter.canal.sink.CanalEventSink; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; @@ -72,6 +87,8 @@ public class CanalSourceConnector implements Source, ConnectorCreateService configClass() { return CanalSourceConfig.class; @@ -93,6 +110,7 @@ public void init(ConnectorContext connectorContext) throws Exception { DatabaseConnection.initSourceConnection(); canalServer = CanalServerWithEmbedded.instance(); + canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { @Override public CanalInstance generate(String destination) { @@ -153,9 +171,24 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { sourceConfig.getSourceConnectorConfig().getDbPort()))); parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName()); parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); -// parameter.setPositions(); -// Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}", -// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}") + + // check positions + // example: Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}", + // "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}") + if (sourceConfig.getRecordPositions() != null && !sourceConfig.getRecordPositions().isEmpty()) { + List recordPositions = sourceConfig.getRecordPositions(); + List positions = new ArrayList<>(); + recordPositions.forEach(recordPosition -> { + Map recordPositionMap = new HashMap<>(); + CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getPartition()); + CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getOffset()); + recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); + recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); + recordPositionMap.put("position", canalRecordOffset.getOffset()); + positions.add(JsonUtils.toJSONString(recordPositionMap)); + }); + parameter.setPositions(positions); + } parameter.setSlaveId(slaveId); @@ -204,14 +237,85 @@ public String name() { @Override public void stop() { - + if (!running) { + return; + } + running = false; + canalServer.stop(sourceConfig.getDestination()); + canalServer.stop(); } @Override public List poll() { + int emptyTimes = 0; + com.alibaba.otter.canal.protocol.Message message = null; + if (sourceConfig.getBatchTimeout() < 0) {// perform polling + while (running) { + message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize()); + if (message == null || message.getId() == -1L) { // empty + applyWait(emptyTimes++); + } else { + break; + } + } + } else { // perform with timeout + while (running) { + message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize(), sourceConfig.getBatchTimeout(), TimeUnit.MILLISECONDS); + if (message == null || message.getId() == -1L) { // empty + continue; + } + break; + } + } + + List entries; + assert message != null; + if (message.isRaw()) { + entries = new ArrayList<>(message.getRawEntries().size()); + for (ByteString entry : message.getRawEntries()) { + try { + entries.add(CanalEntry.Entry.parseFrom(entry)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } else { + entries = message.getEntries(); + } + +// List eventDatas = messageParser.parse(pipelineId, entries); // 过滤事务头/尾和回环数据 +// Message result = new Message(message.getId(), eventDatas); +// // 更新一下最后的entry时间,包括被过滤的数据 +// if (!CollectionUtils.isEmpty(entries)) { +// long lastEntryTime = entries.get(entries.size() - 1).getHeader().getExecuteTime(); +// if (lastEntryTime > 0) {// oracle的时间可能为0 +// this.lastEntryTime = lastEntryTime; +// } +// } +// +// if (dump && logger.isInfoEnabled()) { +// String startPosition = null; +// String endPosition = null; +// if (!CollectionUtils.isEmpty(entries)) { +// startPosition = buildPositionForDump(entries.get(0)); +// endPosition = buildPositionForDump(entries.get(entries.size() - 1)); +// } +// +// dumpMessages(result, startPosition, endPosition, entries.size());// 记录一下,方便追查问题 +// } return null; } + // 处理无数据的情况,避免空循环挂死 + private void applyWait(int emptyTimes) { + int newEmptyTimes = Math.min(emptyTimes, maxEmptyTimes); + if (emptyTimes <= 3) { // 3次以内 + Thread.yield(); + } else { // 超过3次,最多只sleep 10ms + LockSupport.parkNanos(1000 * 1000L * newEmptyTimes); + } + } + @Override public Source create() { diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordOffset.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordOffset.java new file mode 100644 index 0000000000..3943dcc193 --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordOffset.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal; + +import org.apache.eventmesh.common.remote.offset.RecordOffset; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@EqualsAndHashCode(callSuper = true) +@Data +@ToString +public class CanalRecordOffset extends RecordOffset { + + private Long offset; + + public CanalRecordOffset() { + + } + + @Override + public Class getRecordOffsetClass() { + return CanalRecordOffset.class; + } +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordPartition.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordPartition.java new file mode 100644 index 0000000000..8aeb09cdbf --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/canal/CanalRecordPartition.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal; + +import org.apache.eventmesh.common.remote.offset.RecordPartition; + +import java.util.Objects; + +import lombok.Data; +import lombok.ToString; + + +@Data +@ToString +public class CanalRecordPartition extends RecordPartition { + + private String journalName; + + private Long timeStamp; + + public CanalRecordPartition() { + super(); + } + + public Class getRecordPartitionClass() { + return CanalRecordPartition.class; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CanalRecordPartition that = (CanalRecordPartition) o; + return Objects.equals(journalName, that.journalName) && Objects.equals(timeStamp, that.timeStamp); + } + + @Override + public int hashCode() { + return Objects.hash(journalName, timeStamp); + } +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java index ae35a1a125..4dbfb6a788 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java @@ -41,10 +41,6 @@ public RocketMQRecordOffset() { } -// public RocketMQRecordOffset(Map offset) { -// super(offset); -// } - @Override public Class getRecordOffsetClass() { return RocketMQRecordOffset.class; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java index 0480c32dbe..2617a58a7f 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java @@ -27,7 +27,6 @@ import lombok.ToString; -@EqualsAndHashCode(callSuper = true) @Data @ToString public class RocketMQRecordPartition extends RecordPartition { @@ -49,10 +48,6 @@ public RocketMQRecordPartition() { super(); } -// public RocketMQRecordPartition(Map partition) { -// super(partition); -// } - public Class getRecordPartitionClass() { return RocketMQRecordPartition.class; }