diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 5c17c93a9c..0638b649a4 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -3,6 +3,7 @@ dependencies { implementation project(":eventmesh-common") implementation project(":eventmesh-registry:eventmesh-registry-api") implementation project(":eventmesh-registry:eventmesh-registry-nacos") + implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api') implementation "com.alibaba.nacos:nacos-client" implementation ("org.springframework.boot:spring-boot-starter-web") { exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat" diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java index 3c270e9126..2603fadf7f 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -1,8 +1,8 @@ package com.apache.eventmesh.admin.server.web; import com.apache.eventmesh.admin.server.AdminServerException; -import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory; +import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; +import com.apache.eventmesh.admin.server.web.handler.request.impl.RequestHandlerFactory; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java index 3f5aff2f57..0c4053d9bd 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java @@ -38,6 +38,7 @@ public void destroy() { server.shutdown(); if(!server.awaitTermination(30, TimeUnit.SECONDS)) { log.warn("[{}] server don't graceful stop in 30s, it will shutdown now", this.getClass().getSimpleName()); + server.shutdownNow(); } } } catch (Exception e) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java new file mode 100644 index 0000000000..e212dd2c5a --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java @@ -0,0 +1,38 @@ +package com.apache.eventmesh.admin.server.web.db; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class DBThreadPool { + private final ThreadPoolExecutor executor = + new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, + Runtime.getRuntime().availableProcessors() * 2, 0L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + @PreDestroy + private void destroy() { + if (!executor.isShutdown()) { + try { + executor.shutdown(); + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("wait heart beat handler thread pool shutdown fail"); + } + } + } + + public ThreadPoolExecutor getExecutors() { + return executor; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java index 2ef1cb9fe2..efb3c95d02 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java @@ -29,8 +29,6 @@ public class EventMeshJobInfo implements Serializable { private Integer jobType; - private Integer position; - private Integer createUid; private Integer updateUid; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobPosition.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java similarity index 72% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobPosition.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java index 8426bcd677..75a7c245e6 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobPosition.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java @@ -9,11 +9,11 @@ import java.util.Date; /** - * @TableName event_mesh_job_position + * @TableName event_mesh_mysql_position */ -@TableName(value ="event_mesh_job_position") +@TableName(value ="event_mesh_mysql_position") @Data -public class EventMeshJobPosition implements Serializable { +public class EventMeshMysqlPosition implements Serializable { @TableId(type = IdType.AUTO) private Integer id; @@ -23,6 +23,10 @@ public class EventMeshJobPosition implements Serializable { private Long position; + private Long timestamp; + + private String journalName; + private Date createTime; private Date updateTime; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java new file mode 100644 index 0000000000..b1c51e7b4e --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java @@ -0,0 +1,29 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_position_reporter_history + */ +@TableName(value ="event_mesh_position_reporter_history") +@Data +public class EventMeshPositionReporterHistory implements Serializable { + @TableId(type = IdType.AUTO) + private Long id; + + private Integer job; + + private String record; + + private String address; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshHeartbeat.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java similarity index 65% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshHeartbeat.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java index 38ec262ddb..298a501739 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshHeartbeat.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java @@ -6,15 +6,16 @@ import lombok.Data; import java.io.Serializable; +import java.util.Date; /** - * @TableName event_mesh_heartbeat + * @TableName event_mesh_runtime_heartbeat */ -@TableName(value ="event_mesh_heartbeat") +@TableName(value ="event_mesh_runtime_heartbeat") @Data -public class EventMeshHeartbeat implements Serializable { +public class EventMeshRuntimeHeartbeat implements Serializable { @TableId(type = IdType.AUTO) - private Integer id; + private Long id; private String adminAddr; @@ -24,7 +25,9 @@ public class EventMeshHeartbeat implements Serializable { private String reportTime; - private String updateTime; + private Date updateTime; + + private Date createTime; private static final long serialVersionUID = 1L; } \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java new file mode 100644 index 0000000000..7d3e3f463c --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java @@ -0,0 +1,27 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @TableName event_mesh_runtime_history + */ +@TableName(value ="event_mesh_runtime_history") +@Data +public class EventMeshRuntimeHistory implements Serializable { + @TableId(type = IdType.AUTO) + private Long id; + + private Integer job; + + private String address; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java index 0d003cc109..805d3642a0 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java @@ -1,20 +1,20 @@ -package com.apache.eventmesh.admin.server.web.db.mapper; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import org.apache.ibatis.annotations.Mapper; - -/** -* @author sodafang -* @description 针对表【event_mesh_data_source】的数据库操作Mapper -* @createDate 2024-05-09 15:52:49 -* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource -*/ -@Mapper -public interface EventMeshDataSourceMapper extends BaseMapper { - -} - - - - +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_data_source】的数据库操作Mapper +* @createDate 2024-05-09 15:52:49 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource +*/ +@Mapper +public interface EventMeshDataSourceMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java index bb5ee8080b..0082a9f2ec 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java @@ -1,20 +1,20 @@ -package com.apache.eventmesh.admin.server.web.db.mapper; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import org.apache.ibatis.annotations.Mapper; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_info】的数据库操作Mapper -* @createDate 2024-05-09 15:51:45 -* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo -*/ -@Mapper -public interface EventMeshJobInfoMapper extends BaseMapper { - -} - - - - +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Mapper +* @createDate 2024-05-09 15:51:45 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo +*/ +@Mapper +public interface EventMeshJobInfoMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshHeartbeatMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java similarity index 58% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshHeartbeatMapper.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java index 50f3977994..1b29508c1c 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshHeartbeatMapper.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java @@ -1,20 +1,20 @@ -package com.apache.eventmesh.admin.server.web.db.mapper; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshHeartbeat; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import org.apache.ibatis.annotations.Mapper; - -/** -* @author sodafang -* @description 针对表【event_mesh_heartbeat】的数据库操作Mapper -* @createDate 2024-05-10 16:05:00 -* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshHeartbeat -*/ -@Mapper -public interface EventMeshHeartbeatMapper extends BaseMapper { - -} - - - - +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition +*/ +@Mapper +public interface EventMeshMysqlPositionMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java new file mode 100644 index 0000000000..2cdc59f713 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory +*/ +@Mapper +public interface EventMeshPositionReporterHistoryMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobPositionMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java similarity index 56% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobPositionMapper.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java index e432406dba..a23ef04225 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobPositionMapper.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java @@ -1,20 +1,20 @@ -package com.apache.eventmesh.admin.server.web.db.mapper; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobPosition; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import org.apache.ibatis.annotations.Mapper; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_position】的数据库操作Mapper -* @createDate 2024-05-09 09:33:01 -* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobPosition -*/ -@Mapper -public interface EventMeshJobPositionMapper extends BaseMapper { - -} - - - - +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat +*/ +@Mapper +public interface EventMeshRuntimeHeartbeatMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java new file mode 100644 index 0000000000..85fa51aec8 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.web.db.mapper; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Mapper +* @createDate 2024-05-14 17:15:03 +* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory +*/ +@Mapper +public interface EventMeshRuntimeHistoryMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java index ea6b1a5b69..6b1e2d8f79 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java @@ -1,13 +1,13 @@ -package com.apache.eventmesh.admin.server.web.db.service; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; -import com.baomidou.mybatisplus.extension.service.IService; - -/** -* @author sodafang -* @description 针对表【event_mesh_data_source】的数据库操作Service -* @createDate 2024-05-09 15:52:49 -*/ -public interface EventMeshDataSourceService extends IService { - -} +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_data_source】的数据库操作Service +* @createDate 2024-05-09 15:52:49 +*/ +public interface EventMeshDataSourceService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshHeartbeatService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshHeartbeatService.java deleted file mode 100644 index b27cb69742..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshHeartbeatService.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.apache.eventmesh.admin.server.web.db.service; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshHeartbeat; -import com.baomidou.mybatisplus.extension.service.IService; - -/** -* @author sodafang -* @description 针对表【event_mesh_heartbeat】的数据库操作Service -* @createDate 2024-05-06 10:57:45 -*/ -public interface EventMeshHeartbeatService extends IService { - -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index 4c7cd24f2d..7d5ed144f4 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -1,13 +1,13 @@ -package com.apache.eventmesh.admin.server.web.db.service; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import com.baomidou.mybatisplus.extension.service.IService; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_info】的数据库操作Service -* @createDate 2024-05-09 15:51:45 -*/ -public interface EventMeshJobInfoService extends IService { - -} +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service +* @createDate 2024-05-09 15:51:45 +*/ +public interface EventMeshJobInfoService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobPositionService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobPositionService.java deleted file mode 100644 index 77d83f99aa..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobPositionService.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.apache.eventmesh.admin.server.web.db.service; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobPosition; -import com.baomidou.mybatisplus.extension.service.IService; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_position】的数据库操作Service -* @createDate 2024-05-06 10:57:45 -*/ -public interface EventMeshJobPositionService extends IService { - -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java new file mode 100644 index 0000000000..2fb34d5b55 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshMysqlPositionService extends IService { + boolean saveOrUpdateByJob(EventMeshMysqlPosition position); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java new file mode 100644 index 0000000000..adaac43959 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshPositionReporterHistoryService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java new file mode 100644 index 0000000000..7049223747 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshRuntimeHeartbeatService extends IService { + boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java new file mode 100644 index 0000000000..0da277f268 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshRuntimeHistoryService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index 0ea4da0d9e..b189adca6e 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -1,22 +1,22 @@ -package com.apache.eventmesh.admin.server.web.db.service.impl; - -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; -import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; -import org.springframework.stereotype.Service; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_info】的数据库操作Service实现 -* @createDate 2024-05-09 15:51:45 -*/ -@Service -public class EventMeshJobInfoServiceImpl extends ServiceImpl - implements EventMeshJobInfoService{ - -} - - - - +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service实现 +* @createDate 2024-05-09 15:51:45 +*/ +@Service +public class EventMeshJobInfoServiceImpl extends ServiceImpl + implements EventMeshJobInfoService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobPositionServiceImpl.java deleted file mode 100644 index af66f402fe..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobPositionServiceImpl.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.apache.eventmesh.admin.server.web.db.service.impl; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobPosition; -import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobPositionMapper; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobPositionService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import org.springframework.stereotype.Service; - -/** -* @author sodafang -* @description 针对表【event_mesh_job_position】的数据库操作Service实现 -* @createDate 2024-05-06 10:57:45 -*/ -@Service -public class EventMeshJobPositionServiceImpl extends ServiceImpl - implements EventMeshJobPositionService { - -} - - - - diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java new file mode 100644 index 0000000000..833ebaa7f3 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -0,0 +1,59 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshMysqlPositionServiceImpl extends ServiceImpl + implements EventMeshMysqlPositionService{ + + @Autowired + EventMeshPositionReporterHistoryService historyService; + + @Override + public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { + EventMeshMysqlPosition old = getOne(Wrappers.query().eq("jobId", position.getJobID())); + if (old == null) { + return save(position); + } else { + if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { + EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); + history.setRecord(JsonUtils.toJSONString(position)); + history.setJob(old.getJobID()); + history.setAddress(old.getAddress()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save mysql position reporter changed history fail", e); + } + + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); + } + if (old.getPosition() > position.getPosition()) { + log.info("job [{}] report position [{}] less than db [{}]", position.getJobID(), position.getPosition(), + old.getPosition()); + return true; + } + return update(position, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java new file mode 100644 index 0000000000..78746b132c --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java @@ -0,0 +1,22 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshPositionReporterHistoryMapper; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +public class EventMeshPositionReporterHistoryServiceImpl extends ServiceImpl + implements EventMeshPositionReporterHistoryService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java new file mode 100644 index 0000000000..2519aa97d4 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java @@ -0,0 +1,59 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshRuntimeHeartbeatServiceImpl extends ServiceImpl + implements EventMeshRuntimeHeartbeatService{ + + @Autowired + EventMeshRuntimeHistoryService historyService; + + @Override + public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) { + EventMeshRuntimeHeartbeat old = getOne(Wrappers.query().eq("runtimeAddr", + entity.getRuntimeAddr())); + if (old == null) { + return save(entity); + } else { + if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { + EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); + history.setAddress(old.getAdminAddr()); + history.setJob(old.getJobID()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save runtime job changed history fail", e); + } + + log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), + entity.getJobID()); + } + if (Long.parseLong(old.getReportTime()) > Long.parseLong(entity.getReportTime())) { + log.info("update heartbeat record ignore, current report time late than db, job " + + "[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr()); + return true; + } + return update(entity, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshHeartbeatServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java similarity index 51% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshHeartbeatServiceImpl.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java index f783377dac..95e7b8708e 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshHeartbeatServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java @@ -1,22 +1,22 @@ -package com.apache.eventmesh.admin.server.web.db.service.impl; - -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshHeartbeat; -import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshHeartbeatMapper; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshHeartbeatService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import org.springframework.stereotype.Service; - -/** -* @author sodafang -* @description 针对表【event_mesh_heartbeat】的数据库操作Service实现 -* @createDate 2024-05-06 10:57:45 -*/ -@Service -public class EventMeshHeartbeatServiceImpl extends ServiceImpl - implements EventMeshHeartbeatService { - -} - - - - +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHistoryMapper; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +public class EventMeshRuntimeHistoryServiceImpl extends ServiceImpl + implements EventMeshRuntimeHistoryService{ + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/ReportHeartBeatHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/ReportHeartBeatHandler.java deleted file mode 100644 index 41cd70ff03..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/ReportHeartBeatHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.apache.eventmesh.admin.server.web.handler; - -import com.apache.eventmesh.admin.server.AdminServerException; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshHeartbeat; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshHeartbeatService; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; -import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; -import org.apache.eventmesh.common.remote.response.EmptyAckResponse; -import org.apache.eventmesh.common.utils.IPUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DuplicateKeyException; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class ReportHeartBeatHandler extends BaseRequestHandler{ - @Autowired - EventMeshHeartbeatService heartbeatService; - - @Override - protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata metadata) { - EventMeshHeartbeat heartbeat = new EventMeshHeartbeat(); - Integer job; - try { - job = Integer.parseInt(request.getJobID()); - heartbeat.setJobID(job); - } catch (NumberFormatException e) { - throw new AdminServerException(ErrorCode.BAD_REQUEST, String.format("illegal job id %s", request.getJobID())); - } - - heartbeat.setReportTime(request.getReportedTimeStamp()); - heartbeat.setAdminAddr(IPUtils.getLocalAddress()); - heartbeat.setRuntimeAddr(request.getAddress()); - heartbeat.setUpdateTime(String.valueOf(System.currentTimeMillis())); - UpdateWrapper updateWrapper = new UpdateWrapper<>(); - QueryWrapper queryWrapper = new QueryWrapper<>(); - for (int i = 0; i < 3; i++) { - try { - EventMeshHeartbeat old = heartbeatService.getOne(queryWrapper.eq("jobID", job)); - if (old == null) { - heartbeatService.save(heartbeat); - } else { - if (Long.parseLong(old.getReportTime()) > Long.parseLong(request.getReportedTimeStamp())) { - break; - } - heartbeatService.update(updateWrapper.eq("updateTime", old.getUpdateTime()).setEntity(heartbeat)); - } - } catch (DuplicateKeyException e) { - log.warn("concurrent insert heart beat record, job id [{}], runtime [{}]", job, request.getAddress()); - } - } - return new EmptyAckResponse(); - } -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java new file mode 100644 index 0000000000..9dfb7e0dde --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/IPositionHandler.java @@ -0,0 +1,8 @@ +package com.apache.eventmesh.admin.server.web.handler.position; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; + +public interface IPositionHandler { + boolean handler(ReportPositionRequest request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java new file mode 100644 index 0000000000..53f690c332 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandler.java @@ -0,0 +1,7 @@ +package com.apache.eventmesh.admin.server.web.handler.position; + +import org.apache.eventmesh.common.remote.job.DataSourceType; + +public abstract class PositionHandler implements IPositionHandler { + protected abstract DataSourceType getSourceType(); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java new file mode 100644 index 0000000000..864c24d30a --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/PositionHandlerFactory.java @@ -0,0 +1,35 @@ +package com.apache.eventmesh.admin.server.web.handler.position; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +public class PositionHandlerFactory implements ApplicationListener { + private final Map handlers = + new ConcurrentHashMap<>(); + public IPositionHandler getHandler(DataSourceType type) { + return handlers.get(type); + } + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + Map beans = + event.getApplicationContext().getBeansOfType(PositionHandler.class); + + for (PositionHandler handler: beans.values()) { + DataSourceType type = handler.getSourceType(); + if (handlers.containsKey(type)) { + log.warn("data source type [{}] handler already exists", type); + continue; + } + handlers.put(type, handler); + } + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java new file mode 100644 index 0000000000..28869828b3 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlPositionHandler.java @@ -0,0 +1,49 @@ +package com.apache.eventmesh.admin.server.web.handler.position.impl; + +import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; +import com.apache.eventmesh.admin.server.web.handler.position.PositionHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class MysqlPositionHandler extends PositionHandler { + @Autowired + EventMeshMysqlPositionService positionService; + + @Override + protected DataSourceType getSourceType() { + return DataSourceType.MYSQL; + } + + @Override + public boolean handler(ReportPositionRequest request, Metadata metadata) { + for (int i = 0; i < 3; i++) { + try { + + if (!positionService.saveOrUpdateByJob(null)) { + log.warn("update job position fail [{}]", request); + return false; + } + return true; + } catch (DuplicateKeyException e) { + log.warn("concurrent report position job [{}], it will try again", request.getJobID()); + } catch (Exception e) { + log.warn("save position job [{}] fail", request.getJobID(), e); + return false; + } + try { + Thread.sleep(200); + } catch (InterruptedException ignore) { + log.warn("save position thread interrupted, [{}]", request); + return true; + } + } + return false; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java similarity index 88% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java index 4906da7353..0e450ebd97 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/BaseRequestHandler.java @@ -1,8 +1,8 @@ -package com.apache.eventmesh.admin.server.web.handler; +package com.apache.eventmesh.admin.server.web.handler.request; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; -import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; public abstract class BaseRequestHandler { public BaseGrpcResponse handlerRequest(T request, Metadata metadata) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchJobRequestHandler.java similarity index 94% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchJobRequestHandler.java index a5fb32e0c2..b88c09278b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/FetchJobRequestHandler.java @@ -1,11 +1,11 @@ -package com.apache.eventmesh.admin.server.web.handler; +package com.apache.eventmesh.admin.server.web.handler.request.impl; import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobPositionService; +import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -31,9 +31,6 @@ public class FetchJobRequestHandler extends BaseRequestHandler { + @Autowired + EventMeshRuntimeHeartbeatService heartbeatService; + + @Autowired + DBThreadPool executor; + + @Override + protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + executor.getExecutors().execute(() -> { + EventMeshRuntimeHeartbeat heartbeat = new EventMeshRuntimeHeartbeat(); + int job; + try { + job = Integer.parseInt(request.getJobID()); + } catch (NumberFormatException e) { + throw new AdminServerException(ErrorCode.BAD_REQUEST, String.format("illegal job id %s", + request.getJobID())); + } + heartbeat.setJobID(job); + heartbeat.setReportTime(request.getReportedTimeStamp()); + heartbeat.setAdminAddr(IPUtils.getLocalAddress()); + heartbeat.setRuntimeAddr(request.getAddress()); + try { + if (!heartbeatService.saveOrUpdateByRuntimeAddress(heartbeat)) { + log.warn("save or update heartbeat fail, job [{}] runtime address [{}]", heartbeat.getJobID(), + heartbeat.getRuntimeAddr()); + } + } catch (Exception e) { + log.warn("save or update heartbeat job [{}] runtime address [{}] fail", heartbeat.getJobID(), + heartbeat.getRuntimeAddr(), e); + } + }); + + return new EmptyAckResponse(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java new file mode 100644 index 0000000000..646d1ba666 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -0,0 +1,71 @@ +package com.apache.eventmesh.admin.server.web.handler.request.impl; + +import com.apache.eventmesh.admin.server.AdminServerException; +import com.apache.eventmesh.admin.server.web.db.DBThreadPool; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.apache.eventmesh.admin.server.web.handler.position.IPositionHandler; +import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; +import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ReportPositionHandler extends BaseRequestHandler { + @Autowired + EventMeshJobInfoService jobInfoService; + + @Autowired + EventMeshDataSourceService dataSourceService; + + @Autowired + DBThreadPool executor; + + @Autowired + PositionHandlerFactory positionHandlerFactory; + + + @Override + protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metadata) { + EventMeshJobInfo jobInfo = jobInfoService.getById(request.getJobID()); + if (jobInfo == null) { + throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("job id [%s] not exists in db", + request.getJobID())); + } + EventMeshDataSource sourceDB = dataSourceService.getById(jobInfo.getSourceData()); + if (sourceDB == null) { + throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("data base [%s] job id [%s] not " + + "exists in db", jobInfo.getSourceData(), jobInfo.getJobID())); + } + DataSourceType type = DataSourceType.getDataSourceType(sourceDB.getDataType()); + if (type == null) { + throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " + + "type [%d]", jobInfo.getSourceData(), jobInfo.getJobID(), sourceDB.getDataType())); + } + IPositionHandler handler = positionHandlerFactory.getHandler(type); + if (handler == null) { + throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " + + "type [%d], it not match any handler", jobInfo.getSourceData(), jobInfo.getJobID(), + sourceDB.getDataType())); + } + + executor.getExecutors().execute(() -> { + try { + handler.handler(request, metadata); + } catch (Exception e) { + log.warn("handle position request fail data base [{}] job id [{}] type [{}]", jobInfo.getSourceData() + , jobInfo.getJobID(), sourceDB.getDataType(), e); + } + }); + return new EmptyAckResponse(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java similarity index 92% rename from eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java rename to eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java index 2f44d8e0de..a4dc2a29f8 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/RequestHandlerFactory.java @@ -1,5 +1,6 @@ -package com.apache.eventmesh.admin.server.web.handler; +package com.apache.eventmesh.admin.server.web.handler.request.impl; +import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; import org.springframework.context.ApplicationListener; diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml index bb5a6ce217..1566351018 100644 --- a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml @@ -1,24 +1,24 @@ - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + id,dataType,address, description,configuration,createUid, - updateUid,createTime,updateTime - - + updateUid,createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml index 0b94e6b7b3..fd8193b778 100644 --- a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml @@ -1,28 +1,27 @@ - - - - - - - - - - - - - - - - - - - - - jobID,name,transportType, - sourceData,targetData,state, - runtimeType,position,createUid, - updateUid,createTime,updateTime - - + + + + + + + + + + + + + + + + + + + + jobID,name,transportType, + sourceData,targetData,state, + runtimeType,createUid, + updateUid,createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobPositionMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml similarity index 72% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshJobPositionMapper.xml rename to eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml index 38a4425e18..9851315a3d 100644 --- a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobPositionMapper.xml +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml @@ -1,20 +1,23 @@ - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + id,jobID,address, - position,createTime,updateTime - - + position,timestamp,journalName, + createTime,updateTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml new file mode 100644 index 0000000000..624b5f9ea5 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + id,job,record, + address,createTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshHeartbeatMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml similarity index 71% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshHeartbeatMapper.xml rename to eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml index 6bfeeda89c..3b49c48582 100644 --- a/eventmesh-admin-server/src/main/resources/mapper/EventMeshHeartbeatMapper.xml +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml @@ -1,20 +1,22 @@ - - - - - - - - - - - - - - - id,adminAddr,runtimeAddr, - jobID,reportTime,updateTime - - + + + + + + + + + + + + + + + + id,adminAddr,runtimeAddr, + jobID,reportTime,updateTime, + createTime + + diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml new file mode 100644 index 0000000000..73ab65a67a --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + id,job,address, + createTime + + diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java index 902f7e898e..28f91cb4e1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java @@ -4,9 +4,9 @@ public enum DataSourceType { MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB), REDIS("Redis", DataSourceDriverType.REDIS, DataSourceClassify.CACHE), ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ); - private String name; - private DataSourceDriverType driverType; - private DataSourceClassify classify; + private final String name; + private final DataSourceDriverType driverType; + private final DataSourceClassify classify; DataSourceType(String name, DataSourceDriverType driverType, DataSourceClassify classify) { this.name = name; @@ -26,12 +26,12 @@ public DataSourceClassify getClassify() { return classify; } - public static DataSourceType getDataSourceType(String name) { - for (DataSourceType dataSourceType : DataSourceType.values()) { - if (dataSourceType.getName().equals(name)) { - return dataSourceType; - } + private static final DataSourceType[] TYPES = DataSourceType.values(); + + public static DataSourceType getDataSourceType(Integer index) { + if (index == null || index < 0 || index >= TYPES.length) { + return null; } - return null; + return TYPES[index]; } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java index 09e56c0c7c..4357167d68 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java @@ -42,7 +42,7 @@ public static JobTransportType getJobTransportType(DataSourceType src, DataSourc } public static JobTransportType getJobTransportType(Integer index) { - if (index == null || index < 0 || index > TYPES.length) { + if (index == null || index < 0 || index >= TYPES.length) { return null; } return TYPES[index]; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java index e241311c45..fcc78bcda3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java @@ -1,14 +1,14 @@ package org.apache.eventmesh.common.remote.request; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.Position; import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; -import java.util.Map; @Data +@EqualsAndHashCode(callSuper = true) public class ReportPositionRequest extends BaseGrpcRequest { private String jobID; @@ -17,4 +17,5 @@ public class ReportPositionRequest extends BaseGrpcRequest { private JobState state; + private String address; } diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload index 730bb2ee32..e6a8ab416e 100644 --- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload +++ b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload @@ -1,3 +1,4 @@ org.apache.eventmesh.common.remote.request.FetchJobRequest org.apache.eventmesh.common.remote.response.FetchJobResponse -org.apache.eventmesh.common.remote.request.ReportPositionRequest \ No newline at end of file +org.apache.eventmesh.common.remote.request.ReportPositionRequest +org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest \ No newline at end of file diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index abee41e2ce..8f3fa21f0c 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -17,6 +17,13 @@ package org.apache.eventmesh.openconnect.offsetmgmt.admin; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; @@ -24,10 +31,10 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.Position; import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.KeyValueStore; @@ -35,21 +42,10 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.protobuf.Any; -import com.google.protobuf.UnsafeByteOperations; - -import lombok.extern.slf4j.Slf4j; - @Slf4j public class AdminOffsetService implements OffsetManagementService { @@ -100,6 +96,7 @@ public void persist() { ReportPositionRequest reportPositionRequest = new ReportPositionRequest(); reportPositionRequest.setJobID(jobId); reportPositionRequest.setState(jobState); + reportPositionRequest.setAddress(IPUtils.getLocalAddress()); reportPositionRequest.setRecordPositionList(recordToSyncList); @@ -198,7 +195,7 @@ public void onCompleted() { }); log.info("init record offset {}", initialRecordOffsetMap); positionStore.putAll(initialRecordOffsetMap); - this.jobState = JobState.INIT; + this.jobState = JobState.RUNNING; this.jobId = offsetStorageConfig.getExtensions().get("jobId"); } }