Skip to content

Commit

Permalink
Merge branch 'eventmesh-function' of https://github.com/xwm1992/Event…
Browse files Browse the repository at this point in the history
…Mesh into eventmesh-function

� Conflicts:
�	eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
  • Loading branch information
xwm1992 committed May 20, 2024
2 parents 8d6616a + 0b3c583 commit c8894cf
Show file tree
Hide file tree
Showing 36 changed files with 319 additions and 168 deletions.
2 changes: 1 addition & 1 deletion eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {

// https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter
implementation group: 'com.baomidou', name: 'mybatis-plus-boot-starter', version: '3.5.5'

implementation "org.reflections:reflections:0.10.2"

// https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter
implementation "com.alibaba:druid-spring-boot-starter"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.apache.eventmesh.admin.server;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.PagedList;

import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;

public interface Admin extends ComponentLifeCycle {
/**
Expand All @@ -21,5 +25,29 @@ public interface Admin extends ComponentLifeCycle {
void reportHeartbeat(ReportHeartBeatRequest heartBeat);


static void main(String[] args) {
// ReportPositionRequest request = new ReportPositionRequest();
// request.setJobID("1");
// request.setAddress("1");
// request.setState(JobState.RUNNING);
// request.setDataSourceType(DataSourceType.MYSQL);
RecordPosition recordPosition = new RecordPosition();
CanalRecordOffset recordOffset = new CanalRecordOffset();
recordOffset.setOffset(12345L);
recordPosition.setRecordOffset(recordOffset);
CanalRecordPartition partition = new CanalRecordPartition();
partition.setJournalName("demo-binary-log-01");
partition.setTimeStamp(System.currentTimeMillis());
recordPosition.setRecordPartition(partition);
// ArrayList<RecordPosition> list = new ArrayList<>();
// list.add(recordPosition);
// request.setRecordPositionList(list);
String bytes = JsonUtils.toJSONString(recordPosition);

RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference<RecordPosition>() {});
RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class);
System.out.println(object1);
System.out.println(object2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.apache.eventmesh.admin.server.AdminServerException;
import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler;
import com.apache.eventmesh.admin.server.web.handler.request.impl.RequestHandlerFactory;
import com.apache.eventmesh.admin.server.web.handler.request.RequestHandlerFactory;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -11,9 +11,9 @@
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
import org.apache.eventmesh.common.remote.request.BaseGrpcRequest;
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
import org.apache.eventmesh.common.remote.response.FailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -30,13 +30,13 @@ private Payload process(Payload value) {
"exists"));
}
try {
BaseRequestHandler<BaseGrpcRequest, BaseGrpcResponse> handler =
BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
handlerFactory.getHandler(value.getMetadata().getType());
if (handler == null) {
return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN,
return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
"not match any request handler"));
}
BaseGrpcResponse response = handler.handlerRequest((BaseGrpcRequest) PayloadUtil.parse(value), value.getMetadata());
BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
if (response == null || response instanceof EmptyAckResponse) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import org.apache.eventmesh.common.remote.JobState;

/**
* @author sodafang
* @description 针对表【event_mesh_job_info】的数据库操作Service
* @createDate 2024-05-09 15:51:45
*/
public interface EventMeshJobInfoService extends IService<EventMeshJobInfo> {

boolean updateJobState(Integer jobID, JobState state);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.remote.JobState;
import org.springframework.stereotype.Service;

/**
Expand All @@ -12,9 +15,22 @@
* @createDate 2024-05-09 15:51:45
*/
@Service
@Slf4j
public class EventMeshJobInfoServiceImpl extends ServiceImpl<EventMeshJobInfoMapper, EventMeshJobInfo>
implements EventMeshJobInfoService{

@Override
public boolean updateJobState(Integer jobID, JobState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobID(jobID);
jobInfo.setState(state.ordinal());
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE.ordinal(),
JobState.COMPLETE.ordinal()));
return true;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,27 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) {
if (old == null) {
return save(position);
} else {
if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) {
EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory();
history.setRecord(JsonUtils.toJSONString(position));
history.setJob(old.getJobID());
history.setAddress(old.getAddress());
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save mysql position reporter changed history fail", e);
}

log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
}
if (old.getPosition() > position.getPosition()) {
if (old.getPosition() >= position.getPosition()) {
log.info("job [{}] report position [{}] less than db [{}]", position.getJobID(), position.getPosition(),
old.getPosition());
return true;
}
return update(position, Wrappers.<EventMeshMysqlPosition>update().eq("updateTime", old.getUpdateTime()));
try {
return update(position, Wrappers.<EventMeshMysqlPosition>update().eq("updateTime", old.getUpdateTime()));
} finally {
if (old.getAddress()!= null && !old.getAddress().equals(position.getAddress())) {
EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory();
history.setRecord(JsonUtils.toJSONString(position));
history.setJob(old.getJobID());
history.setAddress(old.getAddress());
log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save mysql position reporter changed history fail", e);
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -31,25 +31,28 @@ public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat entity) {
if (old == null) {
return save(entity);
} else {
if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) {
EventMeshRuntimeHistory history = new EventMeshRuntimeHistory();
history.setAddress(old.getAdminAddr());
history.setJob(old.getJobID());
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save runtime job changed history fail", e);
}

log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(),
entity.getJobID());
}
if (Long.parseLong(old.getReportTime()) > Long.parseLong(entity.getReportTime())) {
if (Long.parseLong(old.getReportTime()) >= Long.parseLong(entity.getReportTime())) {
log.info("update heartbeat record ignore, current report time late than db, job " +
"[{}], remote [{}]", entity.getJobID(), entity.getRuntimeAddr());
return true;
}
return update(entity, Wrappers.<EventMeshRuntimeHeartbeat>update().eq("updateTime", old.getUpdateTime()));
try {
return update(entity, Wrappers.<EventMeshRuntimeHeartbeat>update().eq("updateTime", old.getUpdateTime()));
} finally {
if (old.getJobID() != null && !old.getJobID().equals(entity.getJobID())) {
EventMeshRuntimeHistory history = new EventMeshRuntimeHistory();
history.setAddress(old.getAdminAddr());
history.setJob(old.getJobID());
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save runtime job changed history fail", e);
}

log.info("runtime [{}] changed job, old job [{}], now [{}]",entity.getRuntimeAddr(),old.getJobID(),
entity.getJobID());
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.apache.eventmesh.admin.server.web.handler.position;

import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.response.FetchPositionResponse;

public interface IFetchPositionHandler {
FetchPositionResponse handler(FetchPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;

public interface IPositionHandler {
public interface IReportPositionHandler {
boolean handler(ReportPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import org.apache.eventmesh.common.remote.job.DataSourceType;

public abstract class PositionHandler implements IPositionHandler {
public abstract class PositionHandler implements IReportPositionHandler,IFetchPositionHandler {
protected abstract DataSourceType getSourceType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
@Component
@Slf4j
public class PositionHandlerFactory implements ApplicationListener<ContextRefreshedEvent> {
private final Map<DataSourceType, IPositionHandler> handlers =
private final Map<DataSourceType, PositionHandler> handlers =
new ConcurrentHashMap<>();
public IPositionHandler getHandler(DataSourceType type) {
public PositionHandler getHandler(DataSourceType type) {
return handlers.get(type);
}

Expand Down

This file was deleted.

Loading

0 comments on commit c8894cf

Please sign in to comment.