From 2bd3e57f53ab92a1668bf1c9e43ecccd77843c83 Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Thu, 16 May 2024 10:03:48 +0800 Subject: [PATCH 1/3] fetch position --- .../admin/server/web/AdminGrpcServer.java | 8 ++-- .../EventMeshMysqlPositionServiceImpl.java | 33 ++++++++------- .../EventMeshRuntimeHeartbeatServiceImpl.java | 39 +++++++++-------- .../position/IFetchPositionHandler.java | 9 ++++ ...ndler.java => IReportPositionHandler.java} | 2 +- .../web/handler/position/PositionHandler.java | 2 +- .../position/PositionHandlerFactory.java | 4 +- ...r.java => MysqlReportPositionHandler.java} | 25 +++++++++-- .../handler/request/BaseRequestHandler.java | 4 +- .../{impl => }/RequestHandlerFactory.java | 11 +++-- .../request/impl/FetchPositionHandler.java | 42 +++++++++++++++++++ .../request/impl/ReportPositionHandler.java | 4 +- .../src/main/resources/application.yaml | 1 + ...rpcRequest.java => BaseRemoteRequest.java} | 2 +- .../request/ReportHeartBeatRequest.java | 2 +- .../remote/request/ReportPositionRequest.java | 5 ++- .../response/FetchPositionResponse.java | 10 +---- 17 files changed, 138 insertions(+), 65 deletions(-) create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IFetchPositionHandler.java rename eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/{IPositionHandler.java => IReportPositionHandler.java} (87%) rename eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/{MysqlPositionHandler.java => MysqlReportPositionHandler.java} (58%) rename eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/{impl => }/RequestHandlerFactory.java (74%) create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchPositionHandler.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/{BaseGrpcRequest.java => BaseRemoteRequest.java} (91%) diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java index 2603fadf7f..366872ade1 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -2,7 +2,7 @@ import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; -import com.apache.eventmesh.admin.server.web.handler.request.impl.RequestHandlerFactory; +import com.apache.eventmesh.admin.server.web.handler.request.RequestHandlerFactory; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; @@ -11,7 +11,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.payload.PayloadUtil; -import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.EmptyAckResponse; import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; import org.apache.eventmesh.common.remote.response.FailResponse; @@ -30,13 +30,13 @@ private Payload process(Payload value) { "exists")); } try { - BaseRequestHandler handler = + BaseRequestHandler handler = handlerFactory.getHandler(value.getMetadata().getType()); if (handler == null) { return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, "not match any request handler")); } - BaseGrpcResponse response = handler.handlerRequest((BaseGrpcRequest) PayloadUtil.parse(value), value.getMetadata()); + BaseGrpcResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); if (response == null || response instanceof EmptyAckResponse) { return null; } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java index 833ebaa7f3..ad1dd5d10e 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -31,25 +31,28 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { if (old == null) { return save(position); } else { - if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { - EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); - history.setRecord(JsonUtils.toJSONString(position)); - history.setJob(old.getJobID()); - history.setAddress(old.getAddress()); - try { - historyService.save(history); - } catch (Exception e) { - log.warn("save mysql position reporter changed history fail", e); - } - - log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); - } - if (old.getPosition() > position.getPosition()) { + if (old.getPosition() >= position.getPosition()) { log.info("job [{}] report position [{}] less than db [{}]", position.getJobID(), position.getPosition(), old.getPosition()); return true; } - return update(position, Wrappers.update().eq("updateTime", old.getUpdateTime())); + try { + return update(position, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } finally { + if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { + EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); + history.setRecord(JsonUtils.toJSONString(position)); + history.setJob(old.getJobID()); + history.setAddress(old.getAddress()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save mysql position reporter changed history fail", e); + } + + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); + } + } } } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java index 2519aa97d4..a76cac13c1 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java @@ -1,12 +1,12 @@ package com.apache.eventmesh.admin.server.web.db.service.impl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; -import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -31,25 +31,28 @@ public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) { if (old == null) { return save(entity); } else { - if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { - EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); - history.setAddress(old.getAdminAddr()); - history.setJob(old.getJobID()); - try { - historyService.save(history); - } catch (Exception e) { - log.warn("save runtime job changed history fail", e); - } - - log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), - entity.getJobID()); - } - if (Long.parseLong(old.getReportTime()) > Long.parseLong(entity.getReportTime())) { + if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) { log.info("update heartbeat record ignore, current report time late than db, job " + "[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr()); return true; } - return update(entity, Wrappers.update().eq("updateTime", old.getUpdateTime())); + try { + return update(entity, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } finally { + if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { + EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); + history.setAddress(old.getAdminAddr()); + history.setJob(old.getJobID()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save runtime job changed history fail", e); + } + + log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), + entity.getJobID()); + } + } } } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IFetchPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IFetchPositionHandler.java new file mode 100644 index 0000000000..5fb8c3bac0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IFetchPositionHandler.java @@ -0,0 +1,9 @@ +package com.apache.eventmesh.admin.server.web.handler.position; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.response.FetchPositionResponse; + +public interface IFetchPositionHandler { + FetchPositionResponse handler(FetchPositionRequest request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IReportPositionHandler.java similarity index 87% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IReportPositionHandler.java index 9dfb7e0dde..817373e042 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IReportPositionHandler.java @@ -3,6 +3,6 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; -public interface IPositionHandler { +public interface IReportPositionHandler { boolean handler(ReportPositionRequest request, Metadata metadata); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java index 53f690c332..3aa6fa571a 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java @@ -2,6 +2,6 @@ import org.apache.eventmesh.common.remote.job.DataSourceType; -public abstract class PositionHandler implements IPositionHandler { +public abstract class PositionHandler implements IReportPositionHandler,IFetchPositionHandler { protected abstract DataSourceType getSourceType(); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java index 864c24d30a..98d3560326 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java @@ -12,9 +12,9 @@ @Component @Slf4j public class PositionHandlerFactory implements ApplicationListener { - private final Map handlers = + private final Map handlers = new ConcurrentHashMap<>(); - public IPositionHandler getHandler(DataSourceType type) { + public PositionHandler getHandler(DataSourceType type) { return handlers.get(type); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java similarity index 58% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index 28869828b3..59cbd00d46 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -1,18 +1,23 @@ package com.apache.eventmesh.admin.server.web.handler.position.impl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; import com.apache.eventmesh.admin.server.web.handler.position.PositionHandler; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.remote.response.FetchPositionResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; @Component @Slf4j -public class MysqlPositionHandler extends PositionHandler { +public class MysqlReportPositionHandler extends PositionHandler { @Autowired EventMeshMysqlPositionService positionService; @@ -25,8 +30,10 @@ protected DataSourceType getSourceType() { public boolean handler(ReportPositionRequest request, Metadata metadata) { for (int i = 0; i < 3; i++) { try { - - if (!positionService.saveOrUpdateByJob(null)) { + EventMeshMysqlPosition position = new EventMeshMysqlPosition(); + position.setJobID(Integer.parseInt(request.getJobID())); + position.setAddress(request.getAddress()); + if (!positionService.saveOrUpdateByJob(position)) { log.warn("update job position fail [{}]", request); return false; } @@ -46,4 +53,16 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { } return false; } + + @Override + public FetchPositionResponse handler(FetchPositionRequest request, Metadata metadata) { + try { + EventMeshMysqlPosition position = positionService.getOne(Wrappers.query().eq("jobID" + , request.getJobID())); + FetchPositionResponse response = FetchPositionResponse.successResponse(); + return response; + } catch (Exception e) { + return FetchPositionResponse.failResponse(ErrorCode.INTERNAL_ERR,""); + } + } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java index 0e450ebd97..2200431a5b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java @@ -1,10 +1,10 @@ package com.apache.eventmesh.admin.server.web.handler.request; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; -import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; -public abstract class BaseRequestHandler { +public abstract class BaseRequestHandler { public BaseGrpcResponse handlerRequest(T request, Metadata metadata) { return handler(request, metadata); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java similarity index 74% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java index a4dc2a29f8..2b0d9fc810 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java @@ -1,7 +1,6 @@ -package com.apache.eventmesh.admin.server.web.handler.request.impl; +package com.apache.eventmesh.admin.server.web.handler.request; -import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; -import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; @@ -14,10 +13,10 @@ @Component public class RequestHandlerFactory implements ApplicationListener { - private final Map> handlers = + private final Map> handlers = new ConcurrentHashMap<>(); - public BaseRequestHandler getHandler(String type) { + public BaseRequestHandler getHandler(String type) { return handlers.get(type); } @@ -27,7 +26,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) { Map beans = event.getApplicationContext().getBeansOfType(BaseRequestHandler.class); - for (BaseRequestHandler requestHandler : beans.values()) { + for (BaseRequestHandler requestHandler : beans.values()) { Class clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchPositionHandler.java new file mode 100644 index 0000000000..a02ec63f84 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchPositionHandler.java @@ -0,0 +1,42 @@ +package com.apache.eventmesh.admin.server.web.handler.request.impl; + +import com.apache.eventmesh.admin.server.AdminServerException; +import com.apache.eventmesh.admin.server.web.db.DBThreadPool; +import com.apache.eventmesh.admin.server.web.handler.position.IFetchPositionHandler; +import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; +import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class FetchPositionHandler extends BaseRequestHandler { + + @Autowired + DBThreadPool executor; + + @Autowired + PositionHandlerFactory positionHandlerFactory; + + @Override + protected EmptyAckResponse handler(FetchPositionRequest request, Metadata metadata) { + IFetchPositionHandler handler = positionHandlerFactory.getHandler(request.getDataSourceType()); + if (handler == null) { + throw new AdminServerException(ErrorCode.BAD_REQUEST, String.format("illegal data source type [%s] not " + + "match any handler", request.getJobID())); + } + executor.getExecutors().execute(() -> { + try { + handler.handler(request, metadata); + } catch (Exception e) { + log.warn(""); + } + }); + return new EmptyAckResponse(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index 646d1ba666..f070af5ba8 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -6,7 +6,7 @@ import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; -import com.apache.eventmesh.admin.server.web.handler.position.IPositionHandler; +import com.apache.eventmesh.admin.server.web.handler.position.IReportPositionHandler; import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import lombok.extern.slf4j.Slf4j; @@ -51,7 +51,7 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " + "type [%d]", jobInfo.getSourceData(), jobInfo.getJobID(), sourceDB.getDataType())); } - IPositionHandler handler = positionHandlerFactory.getHandler(type); + IReportPositionHandler handler = positionHandlerFactory.getHandler(type); if (handler == null) { throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " + "type [%d], it not match any handler", jobInfo.getSourceData(), jobInfo.getJobID(), diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/src/main/resources/application.yaml index ed6aec0c24..2b450e485c 100644 --- a/eventmesh-admin-server/src/main/resources/application.yaml +++ b/eventmesh-admin-server/src/main/resources/application.yaml @@ -8,6 +8,7 @@ mybatis-plus: mapper-locations: classpath:mapper/*.xml configuration: map-underscore-to-camel-case: false + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl event-mesh: admin-server: port: 8081 \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java similarity index 91% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java index 360054a01e..35717e015c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java @@ -7,7 +7,7 @@ import java.util.Map; @Getter -public abstract class BaseGrpcRequest implements IPayload { +public abstract class BaseRemoteRequest implements IPayload { private Map header = new HashMap<>(); public void addHeader(String key, String value) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportHeartBeatRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportHeartBeatRequest.java index 372beff8de..997672cc77 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportHeartBeatRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportHeartBeatRequest.java @@ -5,7 +5,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class ReportHeartBeatRequest extends BaseGrpcRequest { +public class ReportHeartBeatRequest extends BaseRemoteRequest { private String address; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java index fcc78bcda3..08a7c6da7e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java @@ -3,13 +3,14 @@ import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.job.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; @Data @EqualsAndHashCode(callSuper = true) -public class ReportPositionRequest extends BaseGrpcRequest { +public class ReportPositionRequest extends BaseRemoteRequest { private String jobID; @@ -18,4 +19,6 @@ public class ReportPositionRequest extends BaseGrpcRequest { private JobState state; private String address; + + private DataSourceType dataSourceType; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java index 4e4ae058c6..e96e8f1759 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -1,15 +1,9 @@ package org.apache.eventmesh.common.remote.response; -import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.Position; -import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.JobTransportType; -import org.apache.eventmesh.common.remote.offset.RecordPosition; - -import java.util.Map; - import lombok.Data; import lombok.EqualsAndHashCode; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.offset.RecordPosition; @Data @EqualsAndHashCode(callSuper = true) From d6c21ec7963f9b36b55ac349fc3015a27777ff02 Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Thu, 16 May 2024 11:13:33 +0800 Subject: [PATCH 2/3] report position --- .../db/service/EventMeshJobInfoService.java | 3 +- .../impl/EventMeshJobInfoServiceImpl.java | 15 ++++++ .../impl/MysqlReportPositionHandler.java | 5 ++ .../request/impl/ReportPositionHandler.java | 46 ++++++++++--------- 4 files changed, 47 insertions(+), 22 deletions(-) diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index 7d5ed144f4..89e5c1ffe1 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -2,6 +2,7 @@ import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.baomidou.mybatisplus.extension.service.IService; +import org.apache.eventmesh.common.remote.JobState; /** * @author sodafang @@ -9,5 +10,5 @@ * @createDate 2024-05-09 15:51:45 */ public interface EventMeshJobInfoService extends IService { - + boolean updateJobState(Integer jobID, JobState state); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index b189adca6e..96d9041935 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -1,9 +1,12 @@ package com.apache.eventmesh.admin.server.web.db.service.impl; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.JobState; import org.springframework.stereotype.Service; /** @@ -12,9 +15,21 @@ * @createDate 2024-05-09 15:51:45 */ @Service +@Slf4j public class EventMeshJobInfoServiceImpl extends ServiceImpl implements EventMeshJobInfoService{ + @Override + public boolean updateJobState(Integer jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobID(jobID); + jobInfo.setState(state.ordinal()); + update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE,JobState.COMPLETE)); + return true; + } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index 59cbd00d46..96d19942be 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -8,6 +8,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; @@ -15,6 +16,8 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; +import java.util.List; + @Component @Slf4j public class MysqlReportPositionHandler extends PositionHandler { @@ -30,9 +33,11 @@ protected DataSourceType getSourceType() { public boolean handler(ReportPositionRequest request, Metadata metadata) { for (int i = 0; i < 3; i++) { try { + List recordPositionList = request.getRecordPositionList(); EventMeshMysqlPosition position = new EventMeshMysqlPosition(); position.setJobID(Integer.parseInt(request.getJobID())); position.setAddress(request.getAddress()); + if (!positionService.saveOrUpdateByJob(position)) { log.warn("update job position fail [{}]", request); return false; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index f070af5ba8..26fa0dbabb 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -2,17 +2,15 @@ import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.db.DBThreadPool; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import com.apache.eventmesh.admin.server.web.handler.position.IReportPositionHandler; import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.DataSourceType; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.EmptyAckResponse; import org.springframework.beans.factory.annotation.Autowired; @@ -36,34 +34,40 @@ public class ReportPositionHandler extends BaseRequestHandler { + try { + jobInfoService.updateJobState(jobID, request.getState()); + } catch (Exception e) { + log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), + request.getDataSourceType(), request.getState(), e); + } try { handler.handler(request, metadata); } catch (Exception e) { - log.warn("handle position request fail data base [{}] job id [{}] type [{}]", jobInfo.getSourceData() - , jobInfo.getJobID(), sourceDB.getDataType(), e); + log.warn("handle position request fail, job id [{}] type [{}]", request.getJobID(), + request.getDataSourceType(), e); } }); return new EmptyAckResponse(); From 0b3c58355e75f2ad621d7699ccc4b823fb4696dc Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Fri, 17 May 2024 20:26:02 +0800 Subject: [PATCH 3/3] more --- eventmesh-admin-server/build.gradle | 2 +- .../apache/eventmesh/admin/server/Admin.java | 32 +++++++++++++++++-- .../admin/server/web/AdminGrpcServer.java | 8 ++--- .../impl/EventMeshJobInfoServiceImpl.java | 3 +- .../EventMeshMysqlPositionServiceImpl.java | 3 +- .../impl/MysqlReportPositionHandler.java | 27 +++++++++++++++- .../handler/request/BaseRequestHandler.java | 6 ++-- .../request/RequestHandlerFactory.java | 8 ++--- .../request/impl/ReportPositionHandler.java | 3 ++ .../common/remote/offset/RecordOffset.java | 4 --- .../common/remote/offset/RecordPartition.java | 4 --- .../common/remote/offset/RecordPosition.java | 9 ++++-- .../remote/request/FetchJobRequest.java | 2 +- .../remote/request/FetchPositionRequest.java | 2 +- ...cResponse.java => BaseRemoteResponse.java} | 2 +- .../remote/response/EmptyAckResponse.java | 2 +- .../common/remote/response/FailResponse.java | 4 +-- .../remote/response/FetchJobResponse.java | 2 +- .../response/FetchPositionResponse.java | 2 +- .../eventmesh/common/utils/JsonUtils.java | 8 +++++ .../connector/CanalSourceConnector.java | 15 +++------ .../connector/RocketMQSourceConnector.java | 8 ++--- .../offsetmgmt/admin/AdminOffsetService.java | 5 ++- .../api/data/RecordOffsetManagement.java | 13 ++++---- 24 files changed, 112 insertions(+), 62 deletions(-) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/{BaseGrpcResponse.java => BaseRemoteResponse.java} (93%) diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 0638b649a4..1ea66f59e8 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -16,7 +16,7 @@ dependencies { // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter implementation group: 'com.baomidou', name: 'mybatis-plus-boot-starter', version: '3.5.5' - + implementation "org.reflections:reflections:0.10.2" // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter implementation "com.alibaba:druid-spring-boot-starter" diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java index 13021c98c2..93c5467a84 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -1,9 +1,13 @@ package com.apache.eventmesh.admin.server; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.PagedList; - -import org.apache.eventmesh.common.remote.Task; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; public interface Admin extends ComponentLifeCycle { /** @@ -21,5 +25,29 @@ public interface Admin extends ComponentLifeCycle { void reportHeartbeat(ReportHeartBeatRequest heartBeat); + static void main(String[] args) { +// ReportPositionRequest request = new ReportPositionRequest(); +// request.setJobID("1"); +// request.setAddress("1"); +// request.setState(JobState.RUNNING); +// request.setDataSourceType(DataSourceType.MYSQL); + RecordPosition recordPosition = new RecordPosition(); + CanalRecordOffset recordOffset = new CanalRecordOffset(); + recordOffset.setOffset(12345L); + recordPosition.setRecordOffset(recordOffset); + CanalRecordPartition partition = new CanalRecordPartition(); + partition.setJournalName("demo-binary-log-01"); + partition.setTimeStamp(System.currentTimeMillis()); + recordPosition.setRecordPartition(partition); +// ArrayList list = new ArrayList<>(); +// list.add(recordPosition); +// request.setRecordPositionList(list); + String bytes = JsonUtils.toJSONString(recordPosition); + + RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference() {}); + RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class); + System.out.println(object1); + System.out.println(object2); + } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java index 366872ade1..72c0681e95 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -13,7 +13,7 @@ import org.apache.eventmesh.common.remote.payload.PayloadUtil; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.EmptyAckResponse; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; import org.apache.eventmesh.common.remote.response.FailResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -30,13 +30,13 @@ private Payload process(Payload value) { "exists")); } try { - BaseRequestHandler handler = + BaseRequestHandler handler = handlerFactory.getHandler(value.getMetadata().getType()); if (handler == null) { - return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, + return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN, "not match any request handler")); } - BaseGrpcResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); + BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); if (response == null || response instanceof EmptyAckResponse) { return null; } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index 96d9041935..ebf740df0b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -27,7 +27,8 @@ public boolean updateJobState(Integer jobID, JobState state) { EventMeshJobInfo jobInfo = new EventMeshJobInfo(); jobInfo.setJobID(jobID); jobInfo.setState(state.ordinal()); - update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE,JobState.COMPLETE)); + update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE.ordinal(), + JobState.COMPLETE.ordinal())); return true; } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java index ad1dd5d10e..8229c35152 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -44,13 +44,12 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { history.setRecord(JsonUtils.toJSONString(position)); history.setJob(old.getJobID()); history.setAddress(old.getAddress()); + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); try { historyService.save(history); } catch (Exception e) { log.warn("save mysql position reporter changed history fail", e); } - - log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); } } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index 96d19942be..ab4d10e20d 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -12,6 +12,8 @@ import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; @@ -34,10 +36,33 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { for (int i = 0; i < 3; i++) { try { List recordPositionList = request.getRecordPositionList(); + RecordPosition recordPosition = recordPositionList.get(0); + if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) { + log.warn("report mysql position, but record-partition/partition/offset is null"); + return false; + } + if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) { + log.warn("report mysql position, but record partition class [{}] not match [{}]", + recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class); + return false; + } + if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) { + log.warn("report mysql position, but record offset class [{}] not match [{}]", + recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class); + return false; + } + CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); + CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); EventMeshMysqlPosition position = new EventMeshMysqlPosition(); position.setJobID(Integer.parseInt(request.getJobID())); position.setAddress(request.getAddress()); - + if (offset != null) { + position.setPosition(offset.getOffset()); + } + if (partition != null) { + position.setTimestamp(partition.getTimeStamp()); + position.setJournalName(partition.getJournalName()); + } if (!positionService.saveOrUpdateByJob(position)) { log.warn("update job position fail [{}]", request); return false; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java index 2200431a5b..b78fe6aa1a 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java @@ -2,10 +2,10 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; -public abstract class BaseRequestHandler { - public BaseGrpcResponse handlerRequest(T request, Metadata metadata) { +public abstract class BaseRequestHandler { + public BaseRemoteResponse handlerRequest(T request, Metadata metadata) { return handler(request, metadata); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java index 2b0d9fc810..f2ab552663 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/RequestHandlerFactory.java @@ -1,7 +1,7 @@ package com.apache.eventmesh.admin.server.web.handler.request; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; -import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; @@ -13,10 +13,10 @@ @Component public class RequestHandlerFactory implements ApplicationListener { - private final Map> handlers = + private final Map> handlers = new ConcurrentHashMap<>(); - public BaseRequestHandler getHandler(String type) { + public BaseRequestHandler getHandler(String type) { return handlers.get(type); } @@ -26,7 +26,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) { Map beans = event.getApplicationContext().getBeansOfType(BaseRequestHandler.class); - for (BaseRequestHandler requestHandler : beans.values()) { + for (BaseRequestHandler requestHandler : beans.values()) { Class clazz = requestHandler.getClass(); boolean skip = false; while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index 26fa0dbabb..13f9b48fd5 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -40,6 +40,9 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad if (StringUtils.isBlank(request.getJobID())) { throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); } + if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) { + throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); + } int jobID; try { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java index 3551e04692..bed39bee5b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordOffset.java @@ -17,10 +17,6 @@ package org.apache.eventmesh.common.remote.offset; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - public class RecordOffset { private Class clazz; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java index ec9ffb35f1..60c2b8a2a6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPartition.java @@ -17,10 +17,6 @@ package org.apache.eventmesh.common.remote.offset; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - public class RecordPartition { private Class clazz; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java index 029eac563f..a4e05ff772 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java @@ -17,14 +17,17 @@ package org.apache.eventmesh.common.remote.offset; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + import java.util.Objects; public class RecordPosition { - + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) private RecordPartition recordPartition; private Class recordPartitionClazz; + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY) private RecordOffset recordOffset; private Class recordOffsetClazz; @@ -57,11 +60,11 @@ public void setRecordOffsetClazz(Class recordOffsetClazz this.recordOffsetClazz = recordOffsetClazz; } - public RecordPartition getPartition() { + public RecordPartition getRecordPartition() { return recordPartition; } - public RecordOffset getOffset() { + public RecordOffset getRecordOffset() { return recordOffset; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java index 27b0a82111..9caaaf1feb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java @@ -5,6 +5,6 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchJobRequest extends BaseGrpcRequest { +public class FetchJobRequest extends BaseRemoteRequest { private String jobID; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java index 03444576d0..4d3c1f7530 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java @@ -8,7 +8,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchPositionRequest extends BaseGrpcRequest { +public class FetchPositionRequest extends BaseRemoteRequest { private String jobID; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java similarity index 93% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java index d9791baa87..81c054264d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java @@ -8,7 +8,7 @@ import java.util.Map; @Getter -public abstract class BaseGrpcResponse implements IPayload { +public abstract class BaseRemoteResponse implements IPayload { public static final int UNKNOWN = -1; @Setter private boolean success = true; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java index 48f32135c6..8ef3a73c9d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java @@ -3,6 +3,6 @@ /** * empty, just mean remote received request */ -public class EmptyAckResponse extends BaseGrpcResponse { +public class EmptyAckResponse extends BaseRemoteResponse { } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java index 0eec63173b..3dd690eb0c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java @@ -1,6 +1,6 @@ package org.apache.eventmesh.common.remote.response; -public class FailResponse extends BaseGrpcResponse { +public class FailResponse extends BaseRemoteResponse { public static FailResponse build(int errorCode, String msg) { FailResponse response = new FailResponse(); response.setErrorCode(errorCode); @@ -17,6 +17,6 @@ public static FailResponse build(int errorCode, String msg) { * @return response */ public static FailResponse build(Throwable exception) { - return build(BaseGrpcResponse.UNKNOWN, exception.getMessage()); + return build(BaseRemoteResponse.UNKNOWN, exception.getMessage()); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 3b06edece0..cc94c1aaeb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -11,7 +11,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchJobResponse extends BaseGrpcResponse { +public class FetchJobResponse extends BaseRemoteResponse { private Integer id; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java index e96e8f1759..19d7b5a5c1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -7,7 +7,7 @@ @Data @EqualsAndHashCode(callSuper = true) -public class FetchPositionResponse extends BaseGrpcResponse { +public class FetchPositionResponse extends BaseRemoteResponse { private RecordPosition recordPosition; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index c7504a776e..a1ea6ee59b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -163,6 +163,14 @@ public static T parseTypeReferenceObject(String text, TypeReference typeR } } + public static T parseTypeReferenceObject(byte[] text, TypeReference typeReference) { + try { + return OBJECT_MAPPER.readValue(text, typeReference); + } catch (IOException e) { + throw new JsonException("deserialize json string to typeReference error", e); + } + } + public static JsonNode getJsonNode(String text) { if (StringUtils.isEmpty(text)) { return null; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index e9e86197ad..33f5527b63 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -27,13 +27,12 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,9 +41,6 @@ import java.util.concurrent.locks.LockSupport; -import org.springframework.util.CollectionUtils; - -import com.alibaba.otter.canal.common.CanalException; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; @@ -60,13 +56,10 @@ import com.alibaba.otter.canal.parse.CanalEventParser; import com.alibaba.otter.canal.parse.ha.CanalHAController; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; -import com.alibaba.otter.canal.parse.support.AuthenticationInfo; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; -import com.alibaba.otter.canal.sink.AbstractCanalEventSink; -import com.alibaba.otter.canal.sink.CanalEventSink; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -180,8 +173,8 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { List positions = new ArrayList<>(); recordPositions.forEach(recordPosition -> { Map recordPositionMap = new HashMap<>(); - CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getPartition()); - CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getOffset()); + CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getRecordPartition()); + CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getRecordOffset()); recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); recordPositionMap.put("position", canalRecordOffset.getOffset()); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java index 67e73d7ab1..16e6660c4b 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java @@ -26,8 +26,8 @@ import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordOffset; -import org.apache.eventmesh.openconnect.offsetmgmt.api.data.rocketmq.RocketMQRecordPartition; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordOffset; +import org.apache.eventmesh.common.remote.offset.rocketmq.RocketMQRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; @@ -193,12 +193,12 @@ private List getMessageQueueList(String topic) throws MQClientExce @Override public void commit(ConnectRecord record) { // send success, commit offset - RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getPartition()); + RocketMQRecordPartition rocketMQRecordPartition = (RocketMQRecordPartition)(record.getPosition().getRecordPartition()); String brokerName = rocketMQRecordPartition.getBroker(); String topic = rocketMQRecordPartition.getTopic(); int queueId = Integer.parseInt(rocketMQRecordPartition.getQueueId()); MessageQueue mq = new MessageQueue(topic, brokerName, queueId); - RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getOffset(); + RocketMQRecordOffset rocketMQRecordOffset = (RocketMQRecordOffset)record.getPosition().getRecordOffset(); long offset = rocketMQRecordOffset.getQueueOffset(); long canCommitOffset = removeMessage(mq, offset); log.info("commit record {}|mq {}|canCommitOffset {}", record, mq, canCommitOffset); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 769fe81866..930cbd7a33 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -37,7 +37,6 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; -import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; @@ -153,7 +152,7 @@ public Map getPositionMap() { FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), fetchPositionResponse.getRecordPosition().getRecordOffset()); } } } @@ -188,7 +187,7 @@ public RecordOffset getPosition(RecordPartition partition) { FetchPositionResponse fetchPositionResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getPartition(), fetchPositionResponse.getRecordPosition().getOffset()); + positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), fetchPositionResponse.getRecordPosition().getRecordOffset()); } } } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java index c5075e3e7b..4443c5e600 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/RecordOffsetManagement.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.openconnect.offsetmgmt.api.data; +import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; @@ -31,8 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import lombok.extern.slf4j.Slf4j; - @Slf4j public class RecordOffsetManagement { @@ -51,7 +50,7 @@ public RecordOffsetManagement() { */ public SubmittedPosition submitRecord(RecordPosition position) { SubmittedPosition submittedPosition = new SubmittedPosition(position); - records.computeIfAbsent(position.getPartition(), e -> new LinkedList<>()).add(submittedPosition); + records.computeIfAbsent(position.getRecordPartition(), e -> new LinkedList<>()).add(submittedPosition); // ensure thread safety in operation synchronized (this) { numUnacked.incrementAndGet(); @@ -67,7 +66,7 @@ private RecordOffset pollOffsetWhile(Deque submittedPositions RecordOffset offset = null; // Stop pulling if there is an uncommitted breakpoint while (canCommitHead(submittedPositions)) { - offset = submittedPositions.poll().getPosition().getOffset(); + offset = submittedPositions.poll().getPosition().getRecordOffset(); } return offset; } @@ -239,19 +238,19 @@ public void ack() { * @return */ public boolean remove() { - Deque deque = records.get(position.getPartition()); + Deque deque = records.get(position.getRecordPartition()); if (deque == null) { return false; } boolean result = deque.removeLastOccurrence(this); if (deque.isEmpty()) { - records.remove(position.getPartition()); + records.remove(position.getRecordPartition()); } if (result) { messageAcked(); } else { log.warn("Attempted to remove record from submitted queue for partition {}, " - + "but the record has not been submitted or has already been removed", position.getPartition()); + + "but the record has not been submitted or has already been removed", position.getRecordPartition()); } return result; }