Skip to content

Commit

Permalink
update source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 16, 2024
1 parent 76ead29 commit 1ec65e5
Show file tree
Hide file tree
Showing 18 changed files with 1,365 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.remote.job.SyncConsistency;
import org.apache.eventmesh.common.remote.job.SyncMode;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
Expand Down Expand Up @@ -32,5 +34,33 @@ public class CanalSourceConfig extends SourceConfig {

private List<RecordPosition> recordPositions;

// ================================= channel parameter
// ================================

private Boolean enableRemedy = false; // 是否启用冲突补救算法

// private RemedyAlgorithm remedyAlgorithm; // 冲突补救算法

// private Integer remedyDelayThresoldForMedia; // 针对回环补救,如果反查速度过快,容易查到旧版本的数据记录,导致中美不一致,所以设置一个阀值,低于这个阀值的延迟不进行反查

private SyncMode syncMode; // 同步模式:字段/整条记录

private SyncConsistency syncConsistency; // 同步一致性要求

// ================================= system parameter
// ================================

private String systemSchema; // 默认为retl,不允许为空

private String systemMarkTable; // 双向同步标记表

private String systemMarkTableColumn; // 双向同步标记的列名

private String systemMarkTableInfo; // 双向同步标记的info信息,比如类似BI_SYNC

private String systemBufferTable; // otter同步buffer表

private String systemDualTable; // otter同步心跳表

private SourceConnectorConfig sourceConnectorConfig;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.eventmesh.common.remote.job;

public enum SyncConsistency {
/** 基于当前介质最新数据 */
MEDIA("M"),
/** 基于当前的store记录的数据 */
STORE("S"),
/** 基于当前的变更value,最终一致性 */
BASE("B");

private String value;

SyncConsistency(String value){
this.value = value;
}

public static SyncConsistency valuesOf(String value) {
SyncConsistency[] modes = values();
for (SyncConsistency mode : modes) {
if (mode.value.equalsIgnoreCase(value)) {
return mode;
}
}
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public boolean isMedia() {
return this.equals(SyncConsistency.MEDIA);
}

public boolean isStore() {
return this.equals(SyncConsistency.STORE);
}

public boolean isBase() {
return this.equals(SyncConsistency.BASE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.apache.eventmesh.common.remote.job;

public enum SyncMode {
/** 行记录 */
ROW("R"),
/** 字段记录 */
FIELD("F");

private String value;

SyncMode(String value){
this.value = value;
}

public static SyncMode valuesOf(String value) {
SyncMode[] modes = values();
for (SyncMode mode : modes) {
if (mode.value.equalsIgnoreCase(value)) {
return mode;
}
}
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public boolean isRow() {
return this.equals(SyncMode.ROW);
}

public boolean isField() {
return this.equals(SyncMode.FIELD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,12 @@

public class RecordOffset {

/**
* if pull message from mq key=queueOffset,
* value=queueOffset value
*/
// private Map<String, ?> offset = new HashMap<>();
private Class<? extends RecordOffset> clazz;

public RecordOffset() {

}

// public RecordOffset(Map<String, ?> offset) {
// this.offset = offset;
// }
//
// public Map<String, ?> getOffset() {
// return offset;
// }

public Class<? extends RecordOffset> getRecordOffsetClass() {
return RecordOffset.class;
}
Expand Down
Loading

0 comments on commit 1ec65e5

Please sign in to comment.