From d1c46892d02a457862967624f4410fcfcc55d6c1 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Wed, 15 May 2024 17:23:51 +0800 Subject: [PATCH] update canal source connector --- .../connector/offset/OffsetStorageConfig.java | 6 ++ .../common/remote/offset/RecordOffset.java | 35 +++----- .../common/remote/offset/RecordPartition.java | 43 +++------- .../common/remote/offset/RecordPosition.java | 28 +++++- .../remote/request/FetchJobRequest.java | 2 +- .../remote/request/FetchPositionRequest.java | 21 +++++ .../response/FetchPositionResponse.java | 35 ++++++++ .../connector/CanalSourceConnector.java | 4 + .../connector/RocketMQSourceConnector.java | 46 +++++----- .../eventmesh/openconnect/SourceWorker.java | 4 +- .../offsetmgmt/admin/AdminOffsetService.java | 86 ++++++++++++++++--- .../offsetmgmt/api/data/ConnectRecord.java | 8 ++ .../data/rocketmq/RocketMQRecordOffset.java | 14 ++- .../rocketmq/RocketMQRecordPartition.java | 43 ++++++++-- .../api/storage/ConnectorRecordPartition.java | 82 ------------------ .../DefaultOffsetManagementServiceImpl.java | 11 +-- .../api/storage/OffsetManagementService.java | 11 +-- .../api/storage/OffsetStorageReaderImpl.java | 17 ++-- .../api/storage/OffsetStorageWriterImpl.java | 14 ++- .../offsetmgmt/nacos/NacosConfigService.java | 51 +++++------ .../runtime/connector/ConnectorRuntime.java | 6 +- 21 files changed, 328 insertions(+), 239 deletions(-) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java delete mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/ConnectorRecordPartition.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java index f0befa7afc..e8ff1d4909 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.common.config.connector.offset; +import org.apache.eventmesh.common.remote.job.DataSourceType; + import java.util.Map; import lombok.Data; @@ -29,4 +31,8 @@ public class OffsetStorageConfig { private String offsetStorageAddr; private Map extensions; + + private DataSourceType dataSourceType; + + private DataSourceType dataSinkType; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java index c80b00a6d1..f26933f30e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java @@ -27,39 +27,30 @@ public class RecordOffset { * if pull message from mq key=queueOffset, * value=queueOffset value */ - private Map offset = new HashMap<>(); +// private Map offset = new HashMap<>(); + private Class clazz; public RecordOffset() { } - public RecordOffset(Map offset) { - this.offset = offset; - } - - public Map getOffset() { - return offset; - } +// public RecordOffset(Map offset) { +// this.offset = offset; +// } +// +// public Map getOffset() { +// return offset; +// } public Class getRecordOffsetClass() { return RecordOffset.class; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof RecordOffset)) { - return false; - } - RecordOffset offset1 = (RecordOffset) o; - return Objects.equals(offset, offset1.offset); + public Class getClazz() { + return clazz; } - @Override - public int hashCode() { - return Objects.hash(offset); + public void setClazz(Class clazz) { + this.clazz = clazz; } - } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java index 6fdca8fc57..a6eb1a3d27 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java @@ -23,45 +23,30 @@ public class RecordPartition { - private Map partitionMap = new HashMap<>(); +// private Map partitionMap = new HashMap<>(); - public RecordPartition() { - - } + private Class clazz; - public RecordPartition(Map partition) { - this.partitionMap = partition; - } + public RecordPartition() { - public Map getPartitionMap() { - return partitionMap; } +// public RecordPartition(Map partition) { +// this.partitionMap = partition; +// } +// +// public Map getPartitionMap() { +// return partitionMap; +// } public Class getRecordPartitionClass() { return RecordPartition.class; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - RecordPartition that = (RecordPartition) o; - return Objects.equals(partitionMap, that.partitionMap); - } - - @Override - public int hashCode() { - return Objects.hash(partitionMap); + public Class getClazz() { + return clazz; } - @Override - public String toString() { - return "RecordPartition{" + - "partitionMap=" + partitionMap + - '}'; + public void setClazz(Class clazz) { + this.clazz = clazz; } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java index 91a98a9857..029eac563f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java @@ -21,13 +21,17 @@ public class RecordPosition { - private final RecordPartition recordPartition; + private RecordPartition recordPartition; - private final Class recordPartitionClazz; + private Class recordPartitionClazz; - private final RecordOffset recordOffset; + private RecordOffset recordOffset; - private final Class recordOffsetClazz; + private Class recordOffsetClazz; + + public RecordPosition(){ + + } public RecordPosition( RecordPartition recordPartition, RecordOffset recordOffset) { @@ -37,6 +41,22 @@ public RecordPosition( this.recordOffsetClazz = recordOffset.getRecordOffsetClass(); } + public void setRecordPartition(RecordPartition recordPartition) { + this.recordPartition = recordPartition; + } + + public void setRecordPartitionClazz(Class recordPartitionClazz) { + this.recordPartitionClazz = recordPartitionClazz; + } + + public void setRecordOffset(RecordOffset recordOffset) { + this.recordOffset = recordOffset; + } + + public void setRecordOffsetClazz(Class recordOffsetClazz) { + this.recordOffsetClazz = recordOffsetClazz; + } + public RecordPartition getPartition() { return recordPartition; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java index 3e62764089..27b0a82111 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java @@ -6,5 +6,5 @@ @Data @EqualsAndHashCode(callSuper = true) public class FetchJobRequest extends BaseGrpcRequest { - String jobID; + private String jobID; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java new file mode 100644 index 0000000000..03444576d0 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java @@ -0,0 +1,21 @@ +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class FetchPositionRequest extends BaseGrpcRequest { + + private String jobID; + + private String address; + + private RecordPosition recordPosition; + + private DataSourceType dataSourceType; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java new file mode 100644 index 0000000000..4e4ae058c6 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -0,0 +1,35 @@ +package org.apache.eventmesh.common.remote.response; + +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.Position; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.JobTransportType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.Map; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class FetchPositionResponse extends BaseGrpcResponse { + + private RecordPosition recordPosition; + + public static FetchPositionResponse successResponse() { + FetchPositionResponse response = new FetchPositionResponse(); + response.setSuccess(true); + response.setErrorCode(ErrorCode.SUCCESS); + return response; + } + + public static FetchPositionResponse failResponse(int code, String desc) { + FetchPositionResponse response = new FetchPositionResponse(); + response.setSuccess(false); + response.setErrorCode(code); + response.setDesc(desc); + return response; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index eda956b342..0adcbbb763 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -25,6 +25,7 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.net.InetSocketAddress; import java.util.Arrays; @@ -61,6 +62,8 @@ public class CanalSourceConnector implements Source, ConnectorCreateService partitionMap = new HashMap<>(); - partitionMap.put("topic", messageQueue.getTopic()); - partitionMap.put("brokerName", messageQueue.getBrokerName()); - partitionMap.put("queueId", messageQueue.getQueueId() + ""); - RecordPartition recordPartition = new RocketMQRecordPartition(partitionMap); +// Map partitionMap = new HashMap<>(); +// partitionMap.put("topic", messageQueue.getTopic()); +// partitionMap.put("brokerName", messageQueue.getBrokerName()); +// partitionMap.put("queueId", messageQueue.getQueueId() + ""); + RocketMQRecordPartition recordPartition = new RocketMQRecordPartition(); + recordPartition.setBroker(messageQueue.getBrokerName()); + recordPartition.setTopic(messageQueue.getTopic()); + recordPartition.setQueueId(messageQueue.getQueueId() + ""); + recordPartition.setClazz(recordPartition.getRecordPartitionClass()); RecordOffset recordOffset = offsetStorageReader.readOffset(recordPartition); log.info("assigned messageQueue {}, recordOffset {}", messageQueue, recordOffset); if (recordOffset != null) { - long pollOffset = (Long) recordOffset.getOffset().get("queueOffset"); + long pollOffset = ((RocketMQRecordOffset) recordOffset).getQueueOffset(); if (pollOffset != 0) { consumer.seek(messageQueue, pollOffset); } @@ -189,13 +193,13 @@ private List getMessageQueueList(String topic) throws MQClientExce @Override public void commit(ConnectRecord record) { // send success, commit offset - Map map = record.getPosition().getPartition().getPartitionMap(); - String brokerName = (String) map.get("brokerName"); - String topic = (String) map.get("topic"); - int queueId = Integer.parseInt((String) map.get("queueId")); + RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getPartition()); + String brokerName = rocketMQRecordPartition.getBroker(); + String topic = rocketMQRecordPartition.getTopic(); + int queueId = Integer.parseInt(rocketMQRecordPartition.getQueueId()); MessageQueue mq = new MessageQueue(topic, brokerName, queueId); - Map offsetMap = record.getPosition().getOffset().getOffset(); - long offset = Long.parseLong((String) offsetMap.get("queueOffset")); + RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getOffset(); + long offset = rocketMQRecordOffset.getQueueOffset(); long canCommitOffset = removeMessage(mq, offset); log.info("commit record {}|mq {}|canCommitOffset {}", record, mq, canCommitOffset); // commit offset to prepareCommitOffset @@ -235,17 +239,19 @@ public List poll() { } public static RecordOffset convertToRecordOffset(Long offset) { - Map offsetMap = new HashMap<>(); - offsetMap.put("queueOffset", offset + ""); - return new RocketMQRecordOffset(offsetMap); + RocketMQRecordOffset rocketMQRecordOffset = new RocketMQRecordOffset(); + rocketMQRecordOffset.setQueueOffset(offset); + return rocketMQRecordOffset; } public static RecordPartition convertToRecordPartition(String topic, String brokerName, int queueId) { - Map map = new HashMap<>(); - map.put("topic", topic); - map.put("brokerName", brokerName); - map.put("queueId", queueId + ""); - return new RocketMQRecordPartition(map); + RocketMQRecordPartition rocketMQRecordPartition = new RocketMQRecordPartition(); + rocketMQRecordPartition.setBroker(brokerName); + rocketMQRecordPartition.setTopic(topic); + rocketMQRecordPartition.setQueueId(queueId + ""); + rocketMQRecordPartition.setClazz(rocketMQRecordPartition.getRecordPartitionClass()); + + return rocketMQRecordPartition; } private void putPulledQueueOffset(MessageExt messageExt) { diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 0f759a01e1..6e48aa1de8 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -152,8 +152,8 @@ public void init() { .map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType)) .orElse(new DefaultOffsetManagementServiceImpl()); this.offsetManagementService.initialize(offsetStorageConfig); - this.offsetStorageWriter = new OffsetStorageWriterImpl(source.name(), offsetManagementService); - this.offsetStorageReader = new OffsetStorageReaderImpl(source.name(), offsetManagementService); + this.offsetStorageWriter = new OffsetStorageWriterImpl(offsetManagementService); + this.offsetStorageReader = new OffsetStorageReaderImpl(offsetManagementService); } @Override diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 8f3fa21f0c..769fe81866 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -31,12 +31,16 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.job.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordOffset; +import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.apache.eventmesh.common.remote.response.FetchPositionResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.KeyValueStore; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.MemoryBasedKeyValueStore; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService; @@ -61,12 +65,16 @@ public class AdminOffsetService implements OffsetManagementService { StreamObserver requestObserver; - public KeyValueStore positionStore; + public KeyValueStore positionStore; private String jobId; private JobState jobState; + private DataSourceType dataSourceType; + + private DataSourceType dataSinkType; + @Override public void start() { @@ -85,10 +93,10 @@ public void configure(OffsetStorageConfig config) { @Override public void persist() { - Map recordMap = positionStore.getKVMap(); + Map recordMap = positionStore.getKVMap(); List recordToSyncList = new ArrayList<>(); - for (Map.Entry entry : recordMap.entrySet()) { + for (Map.Entry entry : recordMap.entrySet()) { RecordPosition recordPosition = new RecordPosition(entry.getKey(), entry.getValue()); recordToSyncList.add(recordPosition); } @@ -122,47 +130,97 @@ public void synchronize() { } @Override - public Map getPositionMap() { + public Map getPositionMap() { // get from memory storage first if (positionStore.getKVMap() == null || positionStore.getKVMap().isEmpty()) { - //TODO: get position from adminService + log.info("fetch position from admin server"); + FetchPositionRequest fetchPositionRequest = new FetchPositionRequest(); + fetchPositionRequest.setJobID(jobId); + fetchPositionRequest.setAddress(IPUtils.getLocalAddress()); + fetchPositionRequest.setDataSourceType(dataSourceType); + + Metadata metadata = Metadata.newBuilder() + .setType(FetchPositionRequest.class.getSimpleName()) + .build(); + + Payload request = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations. + unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(fetchPositionRequest)))).build()) + .build(); + Payload response = adminServiceBlockingStub.invoke(request); + if (response.getMetadata().getType().equals(FetchPositionResponse.class.getSimpleName())) { + FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); + assert fetchPositionResponse != null; + if (fetchPositionResponse.isSuccess()) { + positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + } + } } log.info("memory position map {}", positionStore.getKVMap()); return positionStore.getKVMap(); } @Override - public RecordOffset getPosition(ConnectorRecordPartition partition) { + public RecordOffset getPosition(RecordPartition partition) { // get from memory storage first if (positionStore.get(partition) == null) { - //TODO: get position from adminService + log.info("fetch position from admin server"); + FetchPositionRequest fetchPositionRequest = new FetchPositionRequest(); + fetchPositionRequest.setJobID(jobId); + fetchPositionRequest.setAddress(IPUtils.getLocalAddress()); + fetchPositionRequest.setDataSourceType(dataSourceType); + RecordPosition recordPosition = new RecordPosition(); + recordPosition.setRecordPartition(partition); + fetchPositionRequest.setRecordPosition(recordPosition); + + Metadata metadata = Metadata.newBuilder() + .setType(FetchPositionRequest.class.getSimpleName()) + .build(); + + Payload request = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations. + unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(fetchPositionRequest)))).build()) + .build(); + Payload response = adminServiceBlockingStub.invoke(request); + if (response.getMetadata().getType().equals(FetchPositionResponse.class.getSimpleName())) { + FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); + assert fetchPositionResponse != null; + if (fetchPositionResponse.isSuccess()) { + positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + } + } } log.info("memory record position {}", positionStore.get(partition)); return positionStore.get(partition); } @Override - public void putPosition(Map positions) { + public void putPosition(Map positions) { positionStore.putAll(positions); } @Override - public void putPosition(ConnectorRecordPartition partition, RecordOffset position) { + public void putPosition(RecordPartition partition, RecordOffset position) { positionStore.put(partition, position); } @Override - public void removePosition(List partitions) { + public void removePosition(List partitions) { if (partitions == null) { return; } - for (ConnectorRecordPartition partition : partitions) { + for (RecordPartition partition : partitions) { positionStore.remove(partition); } } @Override public void initialize(OffsetStorageConfig offsetStorageConfig) { + this.dataSourceType = offsetStorageConfig.getDataSourceType(); + this.dataSinkType = offsetStorageConfig.getDataSinkType(); + this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr(); this.channel = ManagedChannelBuilder.forTarget(adminServerAddr) .usePlaintext() @@ -190,8 +248,8 @@ public void onCompleted() { requestObserver = adminServiceStub.invokeBiStream(responseObserver); this.positionStore = new MemoryBasedKeyValueStore<>(); - Map initialRecordOffsetMap = JsonUtils.parseTypeReferenceObject(offsetStorageConfig.getExtensions().get("offset"), - new TypeReference>(){ + Map initialRecordOffsetMap = JsonUtils.parseTypeReferenceObject(offsetStorageConfig.getExtensions().get("offset"), + new TypeReference>(){ }); log.info("init record offset {}", initialRecordOffsetMap); positionStore.putAll(initialRecordOffsetMap); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java index 2ed3a8a983..8afcdfbff0 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java @@ -20,6 +20,8 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; import java.util.Objects; import java.util.Set; @@ -45,6 +47,12 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, Long timestamp, Object data) { this.position = new RecordPosition(recordPartition, recordOffset); + if (recordPartition != null) { + this.position.setRecordPartitionClazz(recordPartition.getRecordPartitionClass()); + } + if (recordOffset != null) { + this.position.setRecordOffsetClazz(recordOffset.getRecordOffsetClass()); + } this.timestamp = timestamp; this.data = data; } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java index 870e02848d..ae35a1a125 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordOffset.java @@ -21,6 +21,13 @@ import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@EqualsAndHashCode(callSuper = true) +@Data +@ToString public class RocketMQRecordOffset extends RecordOffset { /** @@ -28,14 +35,15 @@ public class RocketMQRecordOffset extends RecordOffset { * key=queueOffset, * value=queueOffset value */ + private Long queueOffset; public RocketMQRecordOffset() { } - public RocketMQRecordOffset(Map offset) { - super(offset); - } +// public RocketMQRecordOffset(Map offset) { +// super(offset); +// } @Override public Class getRecordOffsetClass() { diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java index e7cdfe500d..0480c32dbe 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/rocketmq/RocketMQRecordPartition.java @@ -18,13 +18,19 @@ package org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq; import org.apache.eventmesh.common.remote.offset.RecordPartition; -import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class RocketMQRecordPartition extends ConnectorRecordPartition { +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + + +@EqualsAndHashCode(callSuper = true) +@Data +@ToString +public class RocketMQRecordPartition extends RecordPartition { /** * key=topic,value=topicName @@ -32,17 +38,40 @@ public class RocketMQRecordPartition extends ConnectorRecordPartition { * key=queueId,value=queueId */ + private String broker; - public RocketMQRecordPartition() { + private String topic; + + private String queueId; - } - public RocketMQRecordPartition(Map partition) { - super(partition); + public RocketMQRecordPartition() { + super(); } +// public RocketMQRecordPartition(Map partition) { +// super(partition); +// } + public Class getRecordPartitionClass() { return RocketMQRecordPartition.class; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RocketMQRecordPartition that = (RocketMQRecordPartition) o; + return Objects.equals(broker, that.broker) && Objects.equals(topic, that.topic) && Objects.equals(queueId, + that.queueId); + } + + @Override + public int hashCode() { + return Objects.hash(broker, topic, queueId); + } } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/ConnectorRecordPartition.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/ConnectorRecordPartition.java deleted file mode 100644 index 66c91c7992..0000000000 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/ConnectorRecordPartition.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.openconnect.offsetmgmt.api.storage; - -import org.apache.eventmesh.common.remote.offset.RecordPartition; - -import java.util.Map; -import java.util.Objects; - -/** - * extend record partition - */ -public class ConnectorRecordPartition extends RecordPartition { - - /** - * connect name - */ - private String connectorName; - - public ConnectorRecordPartition() { - - } - - public ConnectorRecordPartition(Map partition) { - super(partition); - } - - public ConnectorRecordPartition(String connectorName, Map partition) { - super(partition); - this.connectorName = connectorName; - } - - public String getConnectorName() { - return connectorName; - } - - public Class getRecordPartitionClass() { - return ConnectorRecordPartition.class; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ConnectorRecordPartition)) { - return false; - } - if (!super.equals(o)) { - return false; - } - ConnectorRecordPartition that = (ConnectorRecordPartition) o; - return this.connectorName.equals(that.connectorName); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), connectorName); - } - - @Override - public String toString() { - return "ConnectorRecordPartition{" + - "connectorName='" + connectorName + '\'' + - '}'; - } -} diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/DefaultOffsetManagementServiceImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/DefaultOffsetManagementServiceImpl.java index 68fa5b1192..be72097911 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/DefaultOffsetManagementServiceImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/DefaultOffsetManagementServiceImpl.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; import org.apache.eventmesh.common.remote.offset.RecordOffset; +import org.apache.eventmesh.common.remote.offset.RecordPartition; import java.util.List; import java.util.Map; @@ -51,27 +52,27 @@ public void synchronize() { } @Override - public Map getPositionMap() { + public Map getPositionMap() { return null; } @Override - public RecordOffset getPosition(ConnectorRecordPartition partition) { + public RecordOffset getPosition(RecordPartition partition) { return null; } @Override - public void putPosition(Map positions) { + public void putPosition(Map positions) { } @Override - public void putPosition(ConnectorRecordPartition partition, RecordOffset position) { + public void putPosition(RecordPartition partition, RecordOffset position) { } @Override - public void removePosition(List partitions) { + public void removePosition(List partitions) { } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java index 35aa209690..62327a1ae9 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; import org.apache.eventmesh.common.remote.offset.RecordOffset; +import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.spi.EventMeshExtensionType; import org.apache.eventmesh.spi.EventMeshSPI; @@ -68,23 +69,23 @@ default void configure(OffsetStorageConfig config) { * * @return */ - Map getPositionMap(); + Map getPositionMap(); - RecordOffset getPosition(ConnectorRecordPartition partition); + RecordOffset getPosition(RecordPartition partition); /** * Put a position info. */ - void putPosition(Map positions); + void putPosition(Map positions); - void putPosition(ConnectorRecordPartition partition, RecordOffset position); + void putPosition(RecordPartition partition, RecordOffset position); /** * Remove a position info. * * @param partitions */ - void removePosition(List partitions); + void removePosition(List partitions); void initialize(OffsetStorageConfig offsetStorageConfig); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java index 03487f5785..261c9ac2b5 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java @@ -19,6 +19,8 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; import java.util.Collection; import java.util.HashMap; @@ -26,29 +28,24 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { - private final String connectorName; - private OffsetManagementService offsetManagementService; - public OffsetStorageReaderImpl(String connectorName, OffsetManagementService offsetManagementService) { - this.connectorName = connectorName; + public OffsetStorageReaderImpl(OffsetManagementService offsetManagementService) { this.offsetManagementService = offsetManagementService; } @Override public RecordOffset readOffset(RecordPartition partition) { - ConnectorRecordPartition connectorRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartitionMap()); - return offsetManagementService.getPositionMap().get(connectorRecordPartition); + return offsetManagementService.getPositionMap().get(partition); } @Override public Map readOffsets(Collection partitions) { Map result = new HashMap<>(); - Map allData = offsetManagementService.getPositionMap(); + Map allData = offsetManagementService.getPositionMap(); for (RecordPartition key : partitions) { - ConnectorRecordPartition connectorRecordPartition = new ConnectorRecordPartition(connectorName, key.getPartitionMap()); - if (allData.containsKey(connectorRecordPartition)) { - result.put(key, allData.get(connectorRecordPartition)); + if (allData.containsKey(key)) { + result.put(key, allData.get(key)); } } return result; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 0597260d02..088565cbbb 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -35,29 +35,27 @@ @Slf4j public class OffsetStorageWriterImpl implements OffsetStorageWriter, Closeable { - private final String connectorName; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final OffsetManagementService offsetManagementService; /** * Offset data in Connect format */ - private Map data = new HashMap<>(); - private Map toFlush = null; + private Map data = new HashMap<>(); + private Map toFlush = null; // Unique ID for each flush request to handle callbacks after timeouts private long currentFlushId = 0; - public OffsetStorageWriterImpl(String connectorName, OffsetManagementService offsetManagementService) { - this.connectorName = connectorName; + public OffsetStorageWriterImpl(OffsetManagementService offsetManagementService) { this.offsetManagementService = offsetManagementService; } @Override public void writeOffset(RecordPartition partition, RecordOffset offset) { - ConnectorRecordPartition extendRecordPartition; +// RecordPartition extendRecordPartition; if (partition != null) { - extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartitionMap()); - data.put(extendRecordPartition, offset); +// extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartitionMap()); + data.put(partition, offset); } } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos/NacosConfigService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos/NacosConfigService.java index 725e5fe74d..67c53d4d6d 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos/NacosConfigService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-nacos/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/nacos/NacosConfigService.java @@ -19,7 +19,7 @@ import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; import org.apache.eventmesh.common.remote.offset.RecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition; +import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.KeyValueStore; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.MemoryBasedKeyValueStore; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService; @@ -56,7 +56,7 @@ public class NacosConfigService implements OffsetManagementService { private Listener listener; - public KeyValueStore positionStore; + public KeyValueStore positionStore; @Override public void start() { @@ -68,19 +68,19 @@ public void start() { } // merge the updated connectorRecord & recordOffset to memory store - public void mergeOffset(ConnectorRecordPartition connectorRecordPartition, RecordOffset recordOffset) { - if (connectorRecordPartition == null || connectorRecordPartition.getPartitionMap().isEmpty()) { + public void mergeOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + if (recordPartition == null) { return; } - if (positionStore.getKVMap().containsKey(connectorRecordPartition)) { - RecordOffset existedOffset = positionStore.getKVMap().get(connectorRecordPartition); + if (positionStore.getKVMap().containsKey(recordPartition)) { + RecordOffset existedOffset = positionStore.getKVMap().get(recordPartition); // update if (!recordOffset.equals(existedOffset)) { - positionStore.put(connectorRecordPartition, recordOffset); + positionStore.put(recordPartition, recordOffset); } } else { // add new position - positionStore.put(connectorRecordPartition, recordOffset); + positionStore.put(recordPartition, recordOffset); } } @@ -108,12 +108,12 @@ public void load() { @Override public void synchronize() { try { - Map recordMap = positionStore.getKVMap(); + Map recordMap = positionStore.getKVMap(); List> recordToSyncList = new ArrayList<>(); - for (Map.Entry entry : recordMap.entrySet()) { + for (Map.Entry entry : recordMap.entrySet()) { Map synchronizeMap = new HashMap<>(); - synchronizeMap.put("connectorRecordPartition", entry.getKey()); + synchronizeMap.put("recordPartition", entry.getKey()); synchronizeMap.put("recordOffset", entry.getValue()); recordToSyncList.add(synchronizeMap); } @@ -125,13 +125,14 @@ public void synchronize() { } @Override - public Map getPositionMap() { + public Map getPositionMap() { // get from memory storage first if (positionStore.getKVMap() == null || positionStore.getKVMap().isEmpty()) { try { - Map configMap = JacksonUtils.toObj(configService.getConfig(dataId, group, 5000L), - new TypeReference>() { + Map configMap = JacksonUtils.toObj(configService.getConfig(dataId, group, 5000L), + new TypeReference>() { }); + positionStore.putAll(configMap); log.info("nacos position map {}", configMap); return configMap; } catch (NacosException e) { @@ -143,12 +144,12 @@ public Map getPositionMap() { } @Override - public RecordOffset getPosition(ConnectorRecordPartition partition) { + public RecordOffset getPosition(RecordPartition partition) { // get from memory storage first if (positionStore.get(partition) == null) { try { - Map recordMap = JacksonUtils.toObj(configService.getConfig(dataId, group, 5000L), - new TypeReference>() { + Map recordMap = JacksonUtils.toObj(configService.getConfig(dataId, group, 5000L), + new TypeReference>() { }); log.info("nacos record position {}", recordMap.get(partition)); return recordMap.get(partition); @@ -161,21 +162,21 @@ public RecordOffset getPosition(ConnectorRecordPartition partition) { } @Override - public void putPosition(Map positions) { + public void putPosition(Map positions) { positionStore.putAll(positions); } @Override - public void putPosition(ConnectorRecordPartition partition, RecordOffset position) { + public void putPosition(RecordPartition partition, RecordOffset position) { positionStore.put(partition, position); } @Override - public void removePosition(List partitions) { + public void removePosition(List partitions) { if (partitions == null) { return; } - for (ConnectorRecordPartition partition : partitions) { + for (RecordPartition partition : partitions) { positionStore.remove(partition); } } @@ -206,13 +207,13 @@ public void receiveConfigInfo(String configInfo) { }); for (Map recordPartitionOffsetMap : recordOffsetList) { - ConnectorRecordPartition connectorRecordPartition = JacksonUtils.toObj( - JacksonUtils.toJson(recordPartitionOffsetMap.get("connectorRecordPartition")), - ConnectorRecordPartition.class); + RecordPartition recordPartition = JacksonUtils.toObj( + JacksonUtils.toJson(recordPartitionOffsetMap.get("recordPartition")), + RecordPartition.class); RecordOffset recordOffset = JacksonUtils.toObj(JacksonUtils.toJson(recordPartitionOffsetMap.get("recordOffset")), RecordOffset.class); // update the offset in memory store - mergeOffset(connectorRecordPartition, recordOffset); + mergeOffset(recordPartition, recordOffset); } } }; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index da2a869a4b..98aa82cf0b 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -190,13 +190,15 @@ private void initConnectorService() throws Exception { this.offsetManagement = new RecordOffsetManagement(); this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY; OffsetStorageConfig offsetStorageConfig = sourceConfig.getOffsetStorageConfig(); + offsetStorageConfig.setDataSourceType(jobResponse.getTransportType().getSrc()); + offsetStorageConfig.setDataSinkType(jobResponse.getTransportType().getDst()); this.offsetManagementService = Optional.ofNullable(offsetStorageConfig) .map(OffsetStorageConfig::getOffsetStorageType) .map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType)) .orElse(new DefaultOffsetManagementServiceImpl()); this.offsetManagementService.initialize(offsetStorageConfig); - this.offsetStorageWriter = new OffsetStorageWriterImpl(sourceConnector.name(), offsetManagementService); - this.offsetStorageReader = new OffsetStorageReaderImpl(sourceConnector.name(), offsetManagementService); + this.offsetStorageWriter = new OffsetStorageWriterImpl(offsetManagementService); + this.offsetStorageReader = new OffsetStorageReaderImpl(offsetManagementService); sourceConnector.init(sourceConnectorContext);