diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java index 93c5467a84..9be047edc6 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -1,13 +1,8 @@ package com.apache.eventmesh.admin.server; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.eventmesh.common.remote.Task; -import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.PagedList; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; public interface Admin extends ComponentLifeCycle { /** @@ -23,31 +18,4 @@ public interface Admin extends ComponentLifeCycle { * support for task */ void reportHeartbeat(ReportHeartBeatRequest heartBeat); - - - static void main(String[] args) { -// ReportPositionRequest request = new ReportPositionRequest(); -// request.setJobID("1"); -// request.setAddress("1"); -// request.setState(JobState.RUNNING); -// request.setDataSourceType(DataSourceType.MYSQL); - RecordPosition recordPosition = new RecordPosition(); - CanalRecordOffset recordOffset = new CanalRecordOffset(); - recordOffset.setOffset(12345L); - recordPosition.setRecordOffset(recordOffset); - CanalRecordPartition partition = new CanalRecordPartition(); - partition.setJournalName("demo-binary-log-01"); - partition.setTimeStamp(System.currentTimeMillis()); - recordPosition.setRecordPartition(partition); -// ArrayList list = new ArrayList<>(); -// list.add(recordPosition); -// request.setRecordPositionList(list); - String bytes = JsonUtils.toJSONString(recordPosition); - - RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference() {}); - RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class); - System.out.println(object1); - System.out.println(object2); - } - } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index ab4d10e20d..9a41f045bf 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -12,8 +12,8 @@ import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java index a4e05ff772..022ecb93e3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java @@ -17,17 +17,46 @@ package org.apache.eventmesh.common.remote.offset; +import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.eventmesh.common.remote.offset.S3.S3RecordOffset; +import org.apache.eventmesh.common.remote.offset.S3.S3RecordPartition; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; +import org.apache.eventmesh.common.remote.offset.file.FileRecordOffset; +import org.apache.eventmesh.common.remote.offset.file.FileRecordPartition; +import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordOffset; +import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordPartition; +import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordOffset; +import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordPartition; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordOffset; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordPartition; import java.util.Objects; public class RecordPosition { - @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) + @JsonSubTypes({ + @JsonSubTypes.Type(value = CanalRecordPartition.class, name = "CanalRecordPartition"), + @JsonSubTypes.Type(value = FileRecordPartition.class, name = "FileRecordPartition"), + @JsonSubTypes.Type(value = S3RecordPartition.class, name = "S3RecordPartition"), + @JsonSubTypes.Type(value = KafkaRecordPartition.class, name = "KafkaRecordPartition"), + @JsonSubTypes.Type(value = PulsarRecordPartition.class, name = "PulsarRecordPartition"), + @JsonSubTypes.Type(value = RocketMQRecordPartition.class, name = "RocketMQRecordPartition"), + }) private RecordPartition recordPartition; private Class recordPartitionClazz; - @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) + @JsonSubTypes({ + @JsonSubTypes.Type(value = CanalRecordOffset.class, name = "CanalRecordOffset"), + @JsonSubTypes.Type(value = FileRecordOffset.class, name = "FileRecordOffset"), + @JsonSubTypes.Type(value = S3RecordOffset.class, name = "S3RecordOffset"), + @JsonSubTypes.Type(value = KafkaRecordOffset.class, name = "KafkaRecordOffset"), + @JsonSubTypes.Type(value = PulsarRecordOffset.class, name = "PulsarRecordOffset"), + @JsonSubTypes.Type(value = RocketMQRecordOffset.class, name = "RocketMQRecordOffset"), + }) private RecordOffset recordOffset; private Class recordOffsetClazz; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordOffset.java similarity index 95% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordOffset.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordOffset.java index b409aeca8f..82f93ad29f 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordOffset.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3; +package org.apache.eventmesh.common.remote.offset.S3; import org.apache.eventmesh.common.remote.offset.RecordOffset; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordPartition.java similarity index 96% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordPartition.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordPartition.java index 19df52b7fe..4144f4b7aa 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/S3/S3RecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/S3/S3RecordPartition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3; +package org.apache.eventmesh.common.remote.offset.S3; import org.apache.eventmesh.common.remote.offset.RecordPartition; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java index cd31a0210a..2ea0e96ca9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java @@ -17,13 +17,12 @@ package org.apache.eventmesh.common.remote.offset.canal; +import lombok.Data; +import lombok.ToString; import org.apache.eventmesh.common.remote.offset.RecordPartition; import java.util.Objects; -import lombok.Data; -import lombok.ToString; - @Data @ToString diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordOffset.java similarity index 94% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordOffset.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordOffset.java index eb80a1d841..5ecdfc82d9 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordOffset.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.file; +package org.apache.eventmesh.common.remote.offset.file; import org.apache.eventmesh.common.remote.offset.RecordOffset; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordPartition.java similarity index 96% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordPartition.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordPartition.java index 03a30b63f3..7141401345 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/file/FileRecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/file/FileRecordPartition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.file; +package org.apache.eventmesh.common.remote.offset.file; import org.apache.eventmesh.common.remote.offset.RecordPartition; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordOffset.java similarity index 94% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordOffset.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordOffset.java index e16e3c4bd7..5ee22a5b87 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordOffset.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka; +package org.apache.eventmesh.common.remote.offset.kafka; import org.apache.eventmesh.common.remote.offset.RecordOffset; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordPartition.java similarity index 96% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordPartition.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordPartition.java index d1b31abda6..9c5eae8cd7 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/kafka/KafkaRecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/kafka/KafkaRecordPartition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka; +package org.apache.eventmesh.common.remote.offset.kafka; import org.apache.eventmesh.common.remote.offset.RecordPartition; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordOffset.java similarity index 95% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordOffset.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordOffset.java index 6f9b4e3a7f..f3d50e05b4 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordOffset.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar; +package org.apache.eventmesh.common.remote.offset.pulsar; import org.apache.eventmesh.common.remote.offset.RecordOffset; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordPartition.java similarity index 96% rename from eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordPartition.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordPartition.java index 38cc731564..01ae1ab992 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/pulsar/PulsarRecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/pulsar/PulsarRecordPartition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar; +package org.apache.eventmesh.common.remote.offset.pulsar; import org.apache.eventmesh.common.remote.offset.RecordPartition; diff --git a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java index f5928e2cef..dfd166c895 100644 --- a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java @@ -24,7 +24,7 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.file.FileRecordPartition; +import org.apache.eventmesh.common.remote.offset.file.FileRecordPartition; import java.io.BufferedReader; import java.io.File; @@ -35,9 +35,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import lombok.extern.slf4j.Slf4j; diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java index e42df355f5..d89bcb7030 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java @@ -25,8 +25,8 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka.KafkaRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.kafka.KafkaRecordPartition; +import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordOffset; +import org.apache.eventmesh.common.remote.offset.kafka.KafkaRecordPartition; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -36,9 +36,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; public class KafkaSourceConnector implements Source { diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java index 525518692b..df67e8da88 100644 --- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java @@ -19,12 +19,11 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.mq.pulsar.PulsarSourceConfig; -import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.pulsar.PulsarRecordPartition; +import org.apache.eventmesh.common.remote.offset.pulsar.PulsarRecordPartition; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -34,9 +33,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import lombok.extern.slf4j.Slf4j; diff --git a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java index 385791e666..ae86778b60 100644 --- a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java @@ -26,14 +26,12 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3.S3RecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.S3.S3RecordPartition; +import org.apache.eventmesh.common.remote.offset.S3.S3RecordOffset; +import org.apache.eventmesh.common.remote.offset.S3.S3RecordPartition; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java index 6aee1604a0..145531238e 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java @@ -20,8 +20,6 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; import java.util.Objects; import java.util.Set; diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java index 261c9ac2b5..ca4ad2c751 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageReaderImpl.java @@ -19,8 +19,6 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; import java.util.Collection; import java.util.HashMap;