Skip to content

Commit

Permalink
ext db
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed May 24, 2024
1 parent d971f1f commit 0e0164b
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.apache.eventmesh.admin.server.web.db.entity;

import lombok.Data;
import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.Map;

@Data
public class EventMeshJobDetail {
private Integer id;

private String name;

private JobTransportType transportType;

private Map<String, Object> sourceConnectorConfig;

private String sourceConnectorDesc;

private Map<String, Object> sinkConnectorConfig;

private String sinkConnectorDesc;

private RecordPosition position;

private JobState state;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.apache.eventmesh.admin.server.web.db.service;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import org.apache.eventmesh.common.remote.JobState;

/**
* @author sodafang
* @description 针对表【event_mesh_job_info】的数据库操作Service
* @createDate 2024-05-09 15:51:45
*/
public interface EventMeshJobInfoExtService extends IService<EventMeshJobInfo> {
boolean updateJobState(Integer jobID, JobState state);
EventMeshJobDetail getJobDetail(Integer jobID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import org.apache.eventmesh.common.remote.JobState;

/**
* @author sodafang
* @description 针对表【event_mesh_job_info】的数据库操作Service
* @createDate 2024-05-09 15:51:45
*/
public interface EventMeshJobInfoService extends IService<EventMeshJobInfo> {
boolean updateJobState(Integer jobID, JobState state);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.apache.eventmesh.admin.server.web.db.service;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
import com.baomidou.mybatisplus.extension.service.IService;

/**
* @author sodafang
* @description 针对表【event_mesh_mysql_position】的数据库操作Service
* @createDate 2024-05-14 17:15:03
*/
public interface EventMeshMysqlPositionExtService extends IService<EventMeshMysqlPosition> {
boolean saveOrUpdateByJob(EventMeshMysqlPosition position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@
* @createDate 2024-05-14 17:15:03
*/
public interface EventMeshMysqlPositionService extends IService<EventMeshMysqlPosition> {
boolean saveOrUpdateByJob(EventMeshMysqlPosition position);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.apache.eventmesh.admin.server.web.db.service;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import com.baomidou.mybatisplus.extension.service.IService;

/**
* @author sodafang
* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service
* @createDate 2024-05-14 17:15:03
*/
public interface EventMeshRuntimeHeartbeatExtService extends IService<EventMeshRuntimeHeartbeat> {
boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@
* @createDate 2024-05-14 17:15:03
*/
public interface EventMeshRuntimeHeartbeatService extends IService<EventMeshRuntimeHeartbeat> {
boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.remote.JobState;
import org.springframework.stereotype.Service;

/**
* @author sodafang
* @description 针对表【event_mesh_job_info】的数据库操作Service实现
* @createDate 2024-05-09 15:51:45
*/
@Service
@Slf4j
public class EventMeshJobInfoServiceExtImpl extends ServiceImpl<EventMeshJobInfoMapper, EventMeshJobInfo>
implements EventMeshJobInfoExtService {

@Override
public boolean updateJobState(Integer jobID, JobState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobID(jobID);
jobInfo.setState(state.ordinal());
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE.ordinal(),
JobState.COMPLETE.ordinal()));
return true;
}

@Override
public EventMeshJobDetail getJobDetail(Integer jobID) {
return null;
}
}




Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.remote.JobState;
import org.springframework.stereotype.Service;

/**
Expand All @@ -18,19 +16,6 @@
@Slf4j
public class EventMeshJobInfoServiceImpl extends ServiceImpl<EventMeshJobInfoMapper, EventMeshJobInfo>
implements EventMeshJobInfoService{

@Override
public boolean updateJobState(Integer jobID, JobState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobID(jobID);
jobInfo.setState(state.ordinal());
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE.ordinal(),
JobState.COMPLETE.ordinal()));
return true;
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionExtService;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author sodafang
* @description 针对表【event_mesh_mysql_position】的数据库操作Service实现
* @createDate 2024-05-14 17:15:03
*/
@Service
@Slf4j
public class EventMeshMysqlPositionServiceExtImpl extends ServiceImpl<EventMeshMysqlPositionMapper, EventMeshMysqlPosition>
implements EventMeshMysqlPositionExtService {

@Autowired
EventMeshPositionReporterHistoryService historyService;

@Override
public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) {
EventMeshMysqlPosition old = getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobId", position.getJobID()));
if (old == null) {
return save(position);
} else {
if (old.getPosition() >= position.getPosition()) {
log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]",
position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress());
return true;
}
try {
return update(position, Wrappers.<EventMeshMysqlPosition>update().eq("updateTime", old.getUpdateTime()));
} finally {
if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) {
EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory();
history.setRecord(JsonUtils.toJSONString(position));
history.setJob(old.getJobID());
history.setAddress(old.getAddress());
log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save job [{}] mysql position reporter changed history fail, now reporter [{}], old " +
"[{}]", position.getJobID(), position.getAddress(), old.getAddress(), e);
}
}
}
}
}
}




Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
Expand All @@ -21,40 +16,6 @@
@Slf4j
public class EventMeshMysqlPositionServiceImpl extends ServiceImpl<EventMeshMysqlPositionMapper, EventMeshMysqlPosition>
implements EventMeshMysqlPositionService{

@Autowired
EventMeshPositionReporterHistoryService historyService;

@Override
public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) {
EventMeshMysqlPosition old = getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobId", position.getJobID()));
if (old == null) {
return save(position);
} else {
if (old.getPosition() >= position.getPosition()) {
log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]",
position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress());
return true;
}
try {
return update(position, Wrappers.<EventMeshMysqlPosition>update().eq("updateTime", old.getUpdateTime()));
} finally {
if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) {
EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory();
history.setRecord(JsonUtils.toJSONString(position));
history.setJob(old.getJobID());
history.setAddress(old.getAddress());
log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save job [{}] mysql position reporter changed history fail, now reporter [{}], old " +
"[{}]", position.getJobID(), position.getAddress(), old.getAddress(), e);
}
}
}
}
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatExtService;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author sodafang
* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现
* @createDate 2024-05-14 17:15:03
*/
@Service
@Slf4j
public class EventMeshRuntimeHeartbeatServiceExtImpl extends ServiceImpl<EventMeshRuntimeHeartbeatMapper, EventMeshRuntimeHeartbeat>
implements EventMeshRuntimeHeartbeatExtService {

@Autowired
EventMeshRuntimeHistoryService historyService;

@Override
public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) {
EventMeshRuntimeHeartbeat old = getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq("runtimeAddr",
entity.getRuntimeAddr()));
if (old == null) {
return save(entity);
} else {
if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) {
log.info("update heartbeat record ignore, current report time late than db, job " +
"[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr());
return true;
}
try {
return update(entity, Wrappers.<EventMeshRuntimeHeartbeat>update().eq("updateTime", old.getUpdateTime()));
} finally {
if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) {
EventMeshRuntimeHistory history = new EventMeshRuntimeHistory();
history.setAddress(old.getAdminAddr());
history.setJob(old.getJobID());
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save runtime job changed history fail", e);
}

log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(),
entity.getJobID());
}
}
}
}
}




Loading

0 comments on commit 0e0164b

Please sign in to comment.