From 0e0164b6a18afc64a061d89242f103d16cc02314 Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Fri, 24 May 2024 19:24:01 +0800 Subject: [PATCH] ext db --- .../web/db/entity/EventMeshJobDetail.java | 29 +++++++++ .../service/EventMeshJobInfoExtService.java | 16 +++++ .../db/service/EventMeshJobInfoService.java | 2 - .../EventMeshMysqlPositionExtService.java | 13 ++++ .../EventMeshMysqlPositionService.java | 1 - .../EventMeshRuntimeHeartbeatExtService.java | 13 ++++ .../EventMeshRuntimeHeartbeatService.java | 1 - .../impl/EventMeshJobInfoServiceExtImpl.java | 44 +++++++++++++ .../impl/EventMeshJobInfoServiceImpl.java | 19 +----- .../EventMeshMysqlPositionServiceExtImpl.java | 62 +++++++++++++++++++ .../EventMeshMysqlPositionServiceImpl.java | 39 ------------ ...entMeshRuntimeHeartbeatServiceExtImpl.java | 62 +++++++++++++++++++ .../EventMeshRuntimeHeartbeatServiceImpl.java | 35 ----------- .../request/impl/ReportHeartBeatHandler.java | 6 +- .../request/impl/ReportPositionHandler.java | 6 +- 15 files changed, 247 insertions(+), 101 deletions(-) create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionExtService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatExtService.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceExtImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceExtImpl.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceExtImpl.java diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java new file mode 100644 index 0000000000..80174a239d --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java @@ -0,0 +1,29 @@ +package com.apache.eventmesh.admin.server.web.db.entity; + +import lombok.Data; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.job.JobTransportType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.Map; + +@Data +public class EventMeshJobDetail { + private Integer id; + + private String name; + + private JobTransportType transportType; + + private Map sourceConnectorConfig; + + private String sourceConnectorDesc; + + private Map sinkConnectorConfig; + + private String sinkConnectorDesc; + + private RecordPosition position; + + private JobState state; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java new file mode 100644 index 0000000000..c8e5780343 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java @@ -0,0 +1,16 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.baomidou.mybatisplus.extension.service.IService; +import org.apache.eventmesh.common.remote.JobState; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service +* @createDate 2024-05-09 15:51:45 +*/ +public interface EventMeshJobInfoExtService extends IService { + boolean updateJobState(Integer jobID, JobState state); + EventMeshJobDetail getJobDetail(Integer jobID); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index 89e5c1ffe1..c9a1e972a4 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -2,7 +2,6 @@ import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.baomidou.mybatisplus.extension.service.IService; -import org.apache.eventmesh.common.remote.JobState; /** * @author sodafang @@ -10,5 +9,4 @@ * @createDate 2024-05-09 15:51:45 */ public interface EventMeshJobInfoService extends IService { - boolean updateJobState(Integer jobID, JobState state); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionExtService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionExtService.java new file mode 100644 index 0000000000..540d173036 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionExtService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshMysqlPositionExtService extends IService { + boolean saveOrUpdateByJob(EventMeshMysqlPosition position); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java index 2fb34d5b55..83597b0472 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java @@ -9,5 +9,4 @@ * @createDate 2024-05-14 17:15:03 */ public interface EventMeshMysqlPositionService extends IService { - boolean saveOrUpdateByJob(EventMeshMysqlPosition position); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatExtService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatExtService.java new file mode 100644 index 0000000000..ae833ea87d --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatExtService.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.db.service; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service +* @createDate 2024-05-14 17:15:03 +*/ +public interface EventMeshRuntimeHeartbeatExtService extends IService { + boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java index 7049223747..3b9b8465c8 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java @@ -9,5 +9,4 @@ * @createDate 2024-05-14 17:15:03 */ public interface EventMeshRuntimeHeartbeatService extends IService { - boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceExtImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceExtImpl.java new file mode 100644 index 0000000000..021774060c --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceExtImpl.java @@ -0,0 +1,44 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.JobState; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_job_info】的数据库操作Service实现 +* @createDate 2024-05-09 15:51:45 +*/ +@Service +@Slf4j +public class EventMeshJobInfoServiceExtImpl extends ServiceImpl + implements EventMeshJobInfoExtService { + + @Override + public boolean updateJobState(Integer jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobID(jobID); + jobInfo.setState(state.ordinal()); + update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE.ordinal(), + JobState.COMPLETE.ordinal())); + return true; + } + + @Override + public EventMeshJobDetail getJobDetail(Integer jobID) { + return null; + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index ebf740df0b..eb2a6bf601 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -1,12 +1,10 @@ package com.apache.eventmesh.admin.server.web.db.service.impl; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; -import org.apache.eventmesh.common.remote.JobState; import org.springframework.stereotype.Service; /** @@ -18,19 +16,6 @@ @Slf4j public class EventMeshJobInfoServiceImpl extends ServiceImpl implements EventMeshJobInfoService{ - - @Override - public boolean updateJobState(Integer jobID, JobState state) { - if (jobID == null || state == null) { - return false; - } - EventMeshJobInfo jobInfo = new EventMeshJobInfo(); - jobInfo.setJobID(jobID); - jobInfo.setState(state.ordinal()); - update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE.ordinal(), - JobState.COMPLETE.ordinal())); - return true; - } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceExtImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceExtImpl.java new file mode 100644 index 0000000000..3c0bf36fa1 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceExtImpl.java @@ -0,0 +1,62 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionExtService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_mysql_position】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshMysqlPositionServiceExtImpl extends ServiceImpl + implements EventMeshMysqlPositionExtService { + + @Autowired + EventMeshPositionReporterHistoryService historyService; + + @Override + public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { + EventMeshMysqlPosition old = getOne(Wrappers.query().eq("jobId", position.getJobID())); + if (old == null) { + return save(position); + } else { + if (old.getPosition() >= position.getPosition()) { + log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]", + position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress()); + return true; + } + try { + return update(position, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } finally { + if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { + EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); + history.setRecord(JsonUtils.toJSONString(position)); + history.setJob(old.getJobID()); + history.setAddress(old.getAddress()); + log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save job [{}] mysql position reporter changed history fail, now reporter [{}], old " + + "[{}]", position.getJobID(), position.getAddress(), old.getAddress(), e); + } + } + } + } + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java index 5555c59f6a..a3bfa4770b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java @@ -1,15 +1,10 @@ package com.apache.eventmesh.admin.server.web.db.service.impl; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper; import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; -import org.apache.eventmesh.common.utils.JsonUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** @@ -21,40 +16,6 @@ @Slf4j public class EventMeshMysqlPositionServiceImpl extends ServiceImpl implements EventMeshMysqlPositionService{ - - @Autowired - EventMeshPositionReporterHistoryService historyService; - - @Override - public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { - EventMeshMysqlPosition old = getOne(Wrappers.query().eq("jobId", position.getJobID())); - if (old == null) { - return save(position); - } else { - if (old.getPosition() >= position.getPosition()) { - log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]", - position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress()); - return true; - } - try { - return update(position, Wrappers.update().eq("updateTime", old.getUpdateTime())); - } finally { - if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) { - EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); - history.setRecord(JsonUtils.toJSONString(position)); - history.setJob(old.getJobID()); - history.setAddress(old.getAddress()); - log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position); - try { - historyService.save(history); - } catch (Exception e) { - log.warn("save job [{}] mysql position reporter changed history fail, now reporter [{}], old " + - "[{}]", position.getJobID(), position.getAddress(), old.getAddress(), e); - } - } - } - } - } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceExtImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceExtImpl.java new file mode 100644 index 0000000000..609d78dfba --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceExtImpl.java @@ -0,0 +1,62 @@ +package com.apache.eventmesh.admin.server.web.db.service.impl; + +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; +import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory; +import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatExtService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** +* @author sodafang +* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现 +* @createDate 2024-05-14 17:15:03 +*/ +@Service +@Slf4j +public class EventMeshRuntimeHeartbeatServiceExtImpl extends ServiceImpl + implements EventMeshRuntimeHeartbeatExtService { + + @Autowired + EventMeshRuntimeHistoryService historyService; + + @Override + public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) { + EventMeshRuntimeHeartbeat old = getOne(Wrappers.query().eq("runtimeAddr", + entity.getRuntimeAddr())); + if (old == null) { + return save(entity); + } else { + if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) { + log.info("update heartbeat record ignore, current report time late than db, job " + + "[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr()); + return true; + } + try { + return update(entity, Wrappers.update().eq("updateTime", old.getUpdateTime())); + } finally { + if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { + EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); + history.setAddress(old.getAdminAddr()); + history.setJob(old.getJobID()); + try { + historyService.save(history); + } catch (Exception e) { + log.warn("save runtime job changed history fail", e); + } + + log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), + entity.getJobID()); + } + } + } + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java index a76cac13c1..98cfba05a9 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java @@ -20,41 +20,6 @@ @Slf4j public class EventMeshRuntimeHeartbeatServiceImpl extends ServiceImpl implements EventMeshRuntimeHeartbeatService{ - - @Autowired - EventMeshRuntimeHistoryService historyService; - - @Override - public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) { - EventMeshRuntimeHeartbeat old = getOne(Wrappers.query().eq("runtimeAddr", - entity.getRuntimeAddr())); - if (old == null) { - return save(entity); - } else { - if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) { - log.info("update heartbeat record ignore, current report time late than db, job " + - "[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr()); - return true; - } - try { - return update(entity, Wrappers.update().eq("updateTime", old.getUpdateTime())); - } finally { - if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) { - EventMeshRuntimeHistory history = new EventMeshRuntimeHistory(); - history.setAddress(old.getAdminAddr()); - history.setJob(old.getJobID()); - try { - historyService.save(history); - } catch (Exception e) { - log.warn("save runtime job changed history fail", e); - } - - log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(), - entity.getJobID()); - } - } - } - } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportHeartBeatHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportHeartBeatHandler.java index 590cd5477a..375af373c7 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportHeartBeatHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportHeartBeatHandler.java @@ -3,7 +3,7 @@ import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.db.DBThreadPool; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatExtService; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; @@ -18,7 +18,7 @@ @Slf4j public class ReportHeartBeatHandler extends BaseRequestHandler { @Autowired - EventMeshRuntimeHeartbeatService heartbeatService; + EventMeshRuntimeHeartbeatExtService heartbeatExtService; @Autowired DBThreadPool executor; @@ -39,7 +39,7 @@ protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata meta heartbeat.setAdminAddr(IPUtils.getLocalAddress()); heartbeat.setRuntimeAddr(request.getAddress()); try { - if (!heartbeatService.saveOrUpdateByRuntimeAddress(heartbeat)) { + if (!heartbeatExtService.saveOrUpdateByRuntimeAddress(heartbeat)) { log.warn("save or update heartbeat request [{}] fail", request); } } catch (Exception e) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index 3346ec2f68..7dcad59896 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -3,7 +3,7 @@ import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.db.DBThreadPool; import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; -import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; import com.apache.eventmesh.admin.server.web.handler.position.IReportPositionHandler; import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; @@ -20,7 +20,7 @@ @Slf4j public class ReportPositionHandler extends BaseRequestHandler { @Autowired - EventMeshJobInfoService jobInfoService; + EventMeshJobInfoExtService jobInfoExtService; @Autowired EventMeshDataSourceService dataSourceService; @@ -61,7 +61,7 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad executor.getExecutors().execute(() -> { try { - jobInfoService.updateJobState(jobID, request.getState()); + jobInfoExtService.updateJobState(jobID, request.getState()); } catch (Exception e) { log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), request.getDataSourceType(), request.getState(), e);