From 0b3c58355e75f2ad621d7699ccc4b823fb4696dc Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Fri, 17 May 2024 20:26:02 +0800 Subject: [PATCH] more --- eventmesh-admin-server/build.gradle | 2 +- .../apache/eventmesh/admin/server/Admin.java | 32 +++++++++++++++++-- .../admin/server/web/AdminGrpcServer.java | 8 ++--- .../impl/EventMeshJobInfoServiceImpl.java | 3 +- .../EventMeshMysqlPositionServiceImpl.java | 3 +- .../impl/MysqlReportPositionHandler.java | 27 +++++++++++++++- .../handler/request/BaseRequestHandler.java | 6 ++-- .../request/RequestHandlerFactory.java | 8 ++--- .../request/impl/ReportPositionHandler.java | 3 ++ .../common/remote/offset/RecordOffset.java | 4 --- .../common/remote/offset/RecordPartition.java | 4 --- .../common/remote/offset/RecordPosition.java | 9 ++++-- .../remote/request/FetchJobRequest.java | 2 +- .../remote/request/FetchPositionRequest.java | 2 +- ...cResponse.java => BaseRemoteResponse.java} | 2 +- .../remote/response/EmptyAckResponse.java | 2 +- .../common/remote/response/FailResponse.java | 4 +-- .../remote/response/FetchJobResponse.java | 2 +- .../response/FetchPositionResponse.java | 2 +- .../eventmesh/common/utils/JsonUtils.java | 8 +++++ .../connector/CanalSourceConnector.java | 15 +++------ .../connector/RocketMQSourceConnector.java | 8 ++--- .../offsetmgmt/admin/AdminOffsetService.java | 5 ++- .../api/data/RecordOffsetManagement.java | 13 ++++---- 24 files changed, 112 insertions(+), 62 deletions(-) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/{BaseGrpcResponse.java => BaseRemoteResponse.java} (93%) diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 0638b649a4..1ea66f59e8 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -16,7 +16,7 @@ dependencies { // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter implementation group: 'com.baomidou', name: 'mybatis-plus-boot-starter', version: '3.5.5' - + implementation "org.reflections:reflections:0.10.2" // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter implementation "com.alibaba:druid-spring-boot-starter" diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java index 13021c98c2..93c5467a84 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -1,9 +1,13 @@ package com.apache.eventmesh.admin.server; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.PagedList; - -import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; public interface Admin extends ComponentLifeCycle { /** @@ -21,5 +25,29 @@ public interface Admin extends ComponentLifeCycle { void reportHeartbeat(ReportHeartBeatRequest heartBeat); + static void main(String[] args) { +// ReportPositionRequest request = new ReportPositionRequest(); +// request.setJobID("1"); +// request.setAddress("1"); +// request.setState(JobState.RUNNING); +// request.setDataSourceType(DataSourceType.MYSQL); + RecordPosition recordPosition = new RecordPosition(); + CanalRecordOffset recordOffset = new CanalRecordOffset(); + recordOffset.setOffset(12345L); + recordPosition.setRecordOffset(recordOffset); + CanalRecordPartition partition = new CanalRecordPartition(); + partition.setJournalName("demo-binary-log-01"); + partition.setTimeStamp(System.currentTimeMillis()); + recordPosition.setRecordPartition(partition); +// ArrayList list = new ArrayList<>(); +// list.add(recordPosition); +// request.setRecordPositionList(list); + String bytes = JsonUtils.toJSONString(recordPosition); + + RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference() {}); + RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class); + System.out.println(object1); + System.out.println(object2); + } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java index 366872ade1..72c0681e95 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -13,7 +13,7 @@ import org.apache.eventmesh.common.remote.payload.PayloadUtil; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.EmptyAckResponse; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; import org.apache.eventmesh.common.remote.response.FailResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -30,13 +30,13 @@ private Payload process(Payload value) { "exists")); } try { - BaseRequestHandler handler = + BaseRequestHandler handler = handlerFactory.getHandler(value.getMetadata().getType()); if (handler == null) { - return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, + return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN, "not match any request handler")); } - BaseGrpcResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); + BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); if (response == null || response instanceof EmptyAckResponse) { return null; } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index 96d9041935..ebf740df0b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -27,7 +27,8 @@ public boolean updateJobState(Integer jobID, JobState state) { EventMeshJobInfo jobInfo = new EventMeshJobInfo(); jobInfo.setJobID(jobID); jobInfo.setState(state.ordinal()); - update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE,JobState.COMPLETE)); + update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE.ordinal(), + JobState.COMPLETE.ordinal())); return true; } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java index ad1dd5d10e..8229c35152 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -44,13 +44,12 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { history.setRecord(JsonUtils.toJSONString(position)); history.setJob(old.getJobID()); history.setAddress(old.getAddress()); + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); try { historyService.save(history); } catch (Exception e) { log.warn("save mysql position reporter changed history fail", e); } - - log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); } } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index 96d19942be..ab4d10e20d 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -12,6 +12,8 @@ import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; @@ -34,10 +36,33 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { for (int i = 0; i < 3; i++) { try { List recordPositionList = request.getRecordPositionList(); + RecordPosition recordPosition = recordPositionList.get(0); + if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) { + log.warn("report mysql position, but record-partition/partition/offset is null"); + return false; + } + if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) { + log.warn("report mysql position, but record partition class [{}] not match [{}]", + recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class); + return false; + } + if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) { + log.warn("report mysql position, but record offset class [{}] not match [{}]", + recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class); + return false; + } + CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); + CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); EventMeshMysqlPosition position = new EventMeshMysqlPosition(); position.setJobID(Integer.parseInt(request.getJobID())); position.setAddress(request.getAddress()); - + if (offset != null) { + position.setPosition(offset.getOffset()); + } + if (partition != null) { + position.setTimestamp(partition.getTimeStamp()); + position.setJournalName(partition.getJournalName()); + } if (!positionService.saveOrUpdateByJob(position)) { log.warn("update job position fail [{}]", request); return false; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java index 2200431a5b..b78fe6aa1a 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java @@ -2,10 +2,10 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; -public abstract class BaseRequestHandler { - public BaseGrpcResponse handlerRequest(T request, Metadata metadata) { +public abstract class BaseRequestHandler { + public BaseRemoteResponse handlerRequest(T request, Metadata metadata) { return handler(request, metadata); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java index 2b0d9fc810..f2ab552663 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java @@ -1,7 +1,7 @@ package com.apache.eventmesh.admin.server.web.handler.request; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; @@ -13,10 +13,10 @@ @Component public class RequestHandlerFactory implements ApplicationListener { - private final Map> handlers = + private final Map> handlers = new ConcurrentHashMap<>(); - public BaseRequestHandler getHandler(String type) { + public BaseRequestHandler getHandler(String type) { return handlers.get(type); } @@ -26,7 +26,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) { Map beans = event.getApplicationContext().getBeansOfType(BaseRequestHandler.class); - for (BaseRequestHandler requestHandler : beans.values()) { + for (BaseRequestHandler requestHandler : beans.values()) { Class clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index 26fa0dbabb..13f9b48fd5 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -40,6 +40,9 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad if (StringUtils.isBlank(request.getJobID())) { throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); } + if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) { + throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); + } int jobID; try { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java index 3551e04692..bed39bee5b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java @@ -17,10 +17,6 @@ package org.apache.eventmesh.common.remote.offset; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - public class RecordOffset { private Class clazz; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java index ec9ffb35f1..60c2b8a2a6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java @@ -17,10 +17,6 @@ package org.apache.eventmesh.common.remote.offset; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - public class RecordPartition { private Class clazz; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java index 029eac563f..a4e05ff772 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java @@ -17,14 +17,17 @@ package org.apache.eventmesh.common.remote.offset; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + import java.util.Objects; public class RecordPosition { - + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) private RecordPartition recordPartition; private Class recordPartitionClazz; + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) private RecordOffset recordOffset; private Class recordOffsetClazz; @@ -57,11 +60,11 @@ public void setRecordOffsetClazz(Class recordOffsetClazz this.recordOffsetClazz = recordOffsetClazz; } - public RecordPartition getPartition() { + public RecordPartition getRecordPartition() { return recordPartition; } - public RecordOffset getOffset() { + public RecordOffset getRecordOffset() { return recordOffset; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java index 27b0a82111..9caaaf1feb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java @@ -5,6 +5,6 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchJobRequest extends BaseGrpcRequest { +public class FetchJobRequest extends BaseRemoteRequest { private String jobID; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java index 03444576d0..4d3c1f7530 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java @@ -8,7 +8,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchPositionRequest extends BaseGrpcRequest { +public class FetchPositionRequest extends BaseRemoteRequest { private String jobID; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java similarity index 93% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java index d9791baa87..81c054264d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java @@ -8,7 +8,7 @@ import java.util.Map; @Getter -public abstract class BaseGrpcResponse implements IPayload { +public abstract class BaseRemoteResponse implements IPayload { public static final int UNKNOWN = -1; @Setter private boolean success = true; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java index 48f32135c6..8ef3a73c9d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java @@ -3,6 +3,6 @@ /** * empty, just mean remote received request */ -public class EmptyAckResponse extends BaseGrpcResponse { +public class EmptyAckResponse extends BaseRemoteResponse { } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java index 0eec63173b..3dd690eb0c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java @@ -1,6 +1,6 @@ package org.apache.eventmesh.common.remote.response; -public class FailResponse extends BaseGrpcResponse { +public class FailResponse extends BaseRemoteResponse { public static FailResponse build(int errorCode, String msg) { FailResponse response = new FailResponse(); response.setErrorCode(errorCode); @@ -17,6 +17,6 @@ public static FailResponse build(int errorCode, String msg) { * @return response */ public static FailResponse build(Throwable exception) { - return build(BaseGrpcResponse.UNKNOWN, exception.getMessage()); + return build(BaseRemoteResponse.UNKNOWN, exception.getMessage()); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 3b06edece0..cc94c1aaeb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -11,7 +11,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchJobResponse extends BaseGrpcResponse { +public class FetchJobResponse extends BaseRemoteResponse { private Integer id; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java index e96e8f1759..19d7b5a5c1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -7,7 +7,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchPositionResponse extends BaseGrpcResponse { +public class FetchPositionResponse extends BaseRemoteResponse { private RecordPosition recordPosition; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index c7504a776e..a1ea6ee59b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -163,6 +163,14 @@ public static T parseTypeReferenceObject(String text, TypeReference typeR } } + public static T parseTypeReferenceObject(byte[] text, TypeReference typeReference) { + try { + return OBJECT_MAPPER.readValue(text, typeReference); + } catch (IOException e) { + throw new JsonException("deserialize json string to typeReference error", e); + } + } + public static JsonNode getJsonNode(String text) { if (StringUtils.isEmpty(text)) { return null; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index e9e86197ad..33f5527b63 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -27,13 +27,12 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,9 +41,6 @@ import java.util.concurrent.locks.LockSupport; -import org.springframework.util.CollectionUtils; - -import com.alibaba.otter.canal.common.CanalException; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; @@ -60,13 +56,10 @@ import com.alibaba.otter.canal.parse.CanalEventParser; import com.alibaba.otter.canal.parse.ha.CanalHAController; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; -import com.alibaba.otter.canal.parse.support.AuthenticationInfo; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; -import com.alibaba.otter.canal.sink.AbstractCanalEventSink; -import com.alibaba.otter.canal.sink.CanalEventSink; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -180,8 +173,8 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { List positions = new ArrayList<>(); recordPositions.forEach(recordPosition -> { Map recordPositionMap = new HashMap<>(); - CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getPartition()); - CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getOffset()); + CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getRecordPartition()); + CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getRecordOffset()); recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); recordPositionMap.put("position", canalRecordOffset.getOffset()); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java index 67e73d7ab1..16e6660c4b 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java @@ -26,8 +26,8 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordOffset; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; @@ -193,12 +193,12 @@ private List getMessageQueueList(String topic) throws MQClientExce @Override public void commit(ConnectRecord record) { // send success, commit offset - RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getPartition()); + RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getRecordPartition()); String brokerName = rocketMQRecordPartition.getBroker(); String topic = rocketMQRecordPartition.getTopic(); int queueId = Integer.parseInt(rocketMQRecordPartition.getQueueId()); MessageQueue mq = new MessageQueue(topic, brokerName, queueId); - RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getOffset(); + RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getRecordOffset(); long offset = rocketMQRecordOffset.getQueueOffset(); long canCommitOffset = removeMessage(mq, offset); log.info("commit record {}|mq {}|canCommitOffset {}", record, mq, canCommitOffset); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 769fe81866..930cbd7a33 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -37,7 +37,6 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; -import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; @@ -153,7 +152,7 @@ public Map getPositionMap() { FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), fetchPositionResponse.getRecordPosition().getRecordOffset()); } } } @@ -188,7 +187,7 @@ public RecordOffset getPosition(RecordPartition partition) { FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), fetchPositionResponse.getRecordPosition().getRecordOffset()); } } } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java index c5075e3e7b..4443c5e600 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.openconnect.offsetmgmt.api.data; +import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; @@ -31,8 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import lombok.extern.slf4j.Slf4j; - @Slf4j public class RecordOffsetManagement { @@ -51,7 +50,7 @@ public RecordOffsetManagement() { */ public SubmittedPosition submitRecord(RecordPosition position) { SubmittedPosition submittedPosition = new SubmittedPosition(position); - records.computeIfAbsent(position.getPartition(), e -> new LinkedList<>()).add(submittedPosition); + records.computeIfAbsent(position.getRecordPartition(), e -> new LinkedList<>()).add(submittedPosition); // ensure thread safety in operation synchronized (this) { numUnacked.incrementAndGet(); @@ -67,7 +66,7 @@ private RecordOffset pollOffsetWhile(Deque submittedPositions RecordOffset offset = null; // Stop pulling if there is an uncommitted breakpoint while (canCommitHead(submittedPositions)) { - offset = submittedPositions.poll().getPosition().getOffset(); + offset = submittedPositions.poll().getPosition().getRecordOffset(); } return offset; } @@ -239,19 +238,19 @@ public void ack() { * @return */ public boolean remove() { - Deque deque = records.get(position.getPartition()); + Deque deque = records.get(position.getRecordPartition()); if (deque == null) { return false; } boolean result = deque.removeLastOccurrence(this); if (deque.isEmpty()) { - records.remove(position.getPartition()); + records.remove(position.getRecordPartition()); } if (result) { messageAcked(); } else { log.warn("Attempted to remove record from submitted queue for partition {}, " - + "but the record has not been submitted or has already been removed", position.getPartition()); + + "but the record has not been submitted or has already been removed", position.getRecordPartition()); } return result; }