From c615f97332a370f5d255da9d556d072831609b97 Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Tue, 2 Jul 2024 09:58:46 +0800 Subject: [PATCH] Merge EventMesh Function branch to master (#5019) * EventMesh function admin (#4851) * own * dependency * finish registry * EventMesh function admin (#4853) * own * dependency * finish registry * init * Eventmesh function admin (#4854) * own * dependency * finish registry * init * 0419 * 0419 * more discovery and move gRPC * fix dependency * EventMesh function connector runtime (#4858) * [ISSUE #4812] Set up Admin Endpoints v2 (#4813) * Remove redundant overloaded methods * Simplify write() result param * Add writeJson(); Add PUT; Add JavaDoc * Rename EventHttpHandler to EventMeshHttpHandler * Correct server thread name * Clean up messy & non-hierarchical overloading * No need to set headers manually any more * Set up v1&v2 endpoints * Set up v1&v2 response dto * Introduce fastjson2 * Fix fastjson2 "level too large : 2048" error caused by IPAddress * Correct @ConfigField naming * Return properties format json key * Add format option to query string * Introduce Result * Reduce duplicate builder code * Fix all checkstyle warnings in eventmesh-runtime * Add known dependency * [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819) * [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814 * fix_dependencies_problem * fix_check * [ISSUE #4551] modify the logic of time-consumption statistics (#4822) * init connector runtime v2 * [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807) * Handle exception loop by closeOnError * Lombok optimization * some format optimization * Avoid closing multiple times * Remove redundant set null * Revert "Avoid closing multiple times" This reverts commit 774397fcae4905dc0ec8230eab18bf655f202e8f. * Use synchronized latch to keep senderOnComplete called once * Use boolean to prevent latch called by somebody else * Remove the unique callee/caller close() of onCompleted() * [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839) * Remove all references of `eventMesh.connector.plugin.type` * Deprecate `eventMesh.connector.plugin.type` and sort properties * Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills` * Remove 'defibus' related un-used usages * Supplement https://github.com/apache/eventmesh/pull/4809 for `null != object` * [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833) * Revert stale bot to v8 to resolve state cache reserving error * Reduce operations-per-run to default value to ease pressure * Unify yaml to yml * [ISSUE #4820] Bug fix EventHandler not return json (#4821) * bug fix * bug fix * bug fix * update runtime v2 * update connector runtime * update connector runtime * update connector runtime * update connector runtime * update connector runtime --------- Co-authored-by: Pil0tXia Co-authored-by: Zaki <91261012+cnzakii@users.noreply.github.com> Co-authored-by: Karson * [ISSUE #4931]Add Registry Module for Discovery AdminServer * [ISSUES #4933]Add Admin Module * [ISSUE #4935] Add and Move the Pojo Used By Both Runtime and Admin to Common * [ISSUE #4937]fix gradle dependecy and add runtime v2 * [ISSUES #4939]add canal connector * fix missing apache header * fix missing apache header * fix missing apache header * update gradle dependencies * fix admin server ci check error * fix admin server ci check error * fix ci checkStyle error * fix ci check error * [ISSUE #4979]Canal Connector supports bidirectional data synchronization * add bash files for admin & runtime-v2 * fix ack offset read & persist * fix checkStyle error * [ISSUE #4979] Canal Connector supports bidirectional data synchronization (#5011) * [ISSUE #4979]Canal Connector supports bidirectional data synchronization * add bash files for admin & runtime-v2 * fix ack offset read & persist * fix checkStyle error * fix http source connector test error --------- Co-authored-by: sodaRyCN <35725024+sodaRyCN@users.noreply.github.com> Co-authored-by: Pil0tXia Co-authored-by: Zaki <91261012+cnzakii@users.noreply.github.com> Co-authored-by: Karson --- build.gradle | 6 + eventmesh-admin-server/bin/start-admin.sh | 201 ++++++++++++++++++ .../web/db/entity/EventMeshJobDetail.java | 3 +- .../position/EventMeshPositionBizService.java | 6 +- .../position/IFetchPositionHandler.java | 4 +- .../position/impl/MysqlPositionHandler.java | 16 +- .../src/main/resources/application.yaml | 4 +- .../connector/rdb/canal/CanalSinkConfig.java | 15 +- .../rdb/canal/CanalSourceConfig.java | 28 +-- .../remote/response/FetchJobResponse.java | 3 +- .../response/FetchPositionResponse.java | 6 +- .../eventmesh-connector-canal/build.gradle | 3 +- .../connector/canal/dialect/MysqlDialect.java | 22 +- .../SqlBuilderLoadInterceptor.java | 13 +- .../connector/canal/source/EntryParser.java | 110 +++++----- .../connector/CanalSourceConnector.java | 67 ++++-- .../canal/template/MysqlSqlTemplate.java | 2 +- .../connector/HttpSourceConnectorTest.java | 3 + .../api/connector/SourceConnectorContext.java | 6 + .../offsetmgmt/admin/AdminOffsetService.java | 21 +- .../gradle.properties | 18 ++ eventmesh-runtime-v2/bin/start-v2.sh | 200 +++++++++++++++++ eventmesh-runtime-v2/bin/stop-v2.sh | 88 ++++++++ .../runtime/connector/ConnectorRuntime.java | 5 + 24 files changed, 701 insertions(+), 149 deletions(-) create mode 100644 eventmesh-admin-server/bin/start-admin.sh create mode 100644 eventmesh-registry/eventmesh-registry-nacos/gradle.properties create mode 100644 eventmesh-runtime-v2/bin/start-v2.sh create mode 100644 eventmesh-runtime-v2/bin/stop-v2.sh diff --git a/build.gradle b/build.gradle index bbda2423f2..321b7fb7e9 100644 --- a/build.gradle +++ b/build.gradle @@ -163,9 +163,13 @@ tasks.register('dist') { ["eventmesh-common", "eventmesh-meta:eventmesh-meta-api", "eventmesh-metrics-plugin:eventmesh-metrics-api", + "eventmesh-openconnect:eventmesh-openconnect-java", + "eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api", "eventmesh-protocol-plugin:eventmesh-protocol-api", + "eventmesh-registry:eventmesh-registry-api", "eventmesh-retry:eventmesh-retry-api", "eventmesh-runtime", + "eventmesh-runtime-v2", "eventmesh-security-plugin:eventmesh-security-api", "eventmesh-spi", "eventmesh-starter", @@ -750,8 +754,10 @@ subprojects { dependency "software.amazon.awssdk:s3:2.26.3" dependency "com.github.rholder:guava-retrying:2.0.0" + dependency "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.1" dependency "com.alibaba:druid-spring-boot-starter:1.2.23" dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18" + dependency "com.baomidou:mybatis-plus:3.5.6" dependency "com.mysql:mysql-connector-j:8.4.0" } } diff --git a/eventmesh-admin-server/bin/start-admin.sh b/eventmesh-admin-server/bin/start-admin.sh new file mode 100644 index 0000000000..93c3644397 --- /dev/null +++ b/eventmesh-admin-server/bin/start-admin.sh @@ -0,0 +1,201 @@ +#!/bin/bash +# +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +# Server configuration may be inconsistent, add these configurations to avoid garbled code problems +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/customize/your/java/home/here" + +# Detect operating system. +OS=$(uname) + +function is_java8_or_11 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2 + return 0 +} + +function extract_java_version { + local _java="$1" + local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}') + echo "$version" +} + +# 0(not running), 1(is running) +#function is_proxyRunning { +# local _pid="$1" +# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid` +# if [ -z "$pid" ] ; then +# return 0 +# else +# return 1 +# fi +#} + +function get_pid { + local ppid="" + if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then + ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. + if [ ! -d /proc/$ppid ]; then + # Remove the residual file. + rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file + echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output." + ppid="" + fi + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + if [ $DOCKER ]; then + # No need to exclude root user in Docker containers. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'}) + else + # It is required to identify the process as accurately as possible on Linux. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'}) + fi + fi + fi + echo "$ppid"; +} + +#=========================================================================================== +# Locate Java Executable +#=========================================================================================== + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" + JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java") +elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" + JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java") +elif is_java8_or_11 "$(which java)"; then + JAVA="$(which java)" + JAVA_VERSION=$(extract_java_version "$(which java)") +else + echo -e "ERROR\t Java 8 or 11 not found, operation abort." + exit 9; +fi + +echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA" + +EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd) +export EVENTMESH_ADMIN_HOME + +EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs" +export EVENTMESH_ADMIN_LOG_HOME + +echo -e "EVENTMESH_ADMIN_HOME : ${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_ADMIN_LOG_HOME}"; fi +} + +error_exit () +{ + echo -e "ERROR\t $1 !!" + exit 1 +} + +export JAVA_HOME + +#=========================================================================================== +# JVM Configuration +#=========================================================================================== +#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" +#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4" +#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4" +#fi + +GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log" + +#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" +JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc" +if [[ "$JAVA_VERSION" == "8" ]]; then + # Set JAVA_OPT for Java 8 + JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" + JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +elif [[ "$JAVA_VERSION" == "11" ]]; then + # Set JAVA_OPT for Java 11 + XLOG_PARAM="time,level,tags:filecount=5,filesize=30m" + JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}" + JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}" +fi +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} -XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml" +JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" +JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin" + +#if [ -f "pid.file" ]; then +# pid=`cat pid.file` +# if ! is_proxyRunning "$pid"; then +# echo "proxy is running already" +# exit 9; +# else +# echo "err pid$pid, rm pid.file" +# rm pid.file +# fi +#fi + +pid=$(get_pid) +if [[ $pid == "ERROR"* ]]; then + echo -e "${pid}" + exit 9 +fi +if [ -n "$pid" ]; then + echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9 +fi + +make_logs_dir + +echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out + +EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer +if [ $DOCKER ]; then + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out +else + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 & +echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file +fi +exit 0 diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java index b4a836e8be..849a90a883 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.common.remote.job.JobTransportType; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; import java.util.Map; import lombok.Data; @@ -42,7 +43,7 @@ public class EventMeshJobDetail { private String sinkConnectorDesc; - private RecordPosition position; + private List position; private JobState state; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java index 7d6febdf4d..d3b6ff555e 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java @@ -25,6 +25,8 @@ import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -38,7 +40,7 @@ public class EventMeshPositionBizService { PositionHandlerFactory factory; // called isValidateReportRequest before call this - public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) { + public List getPosition(FetchPositionRequest request, Metadata metadata) { if (request == null) { return null; } @@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata) return handler.handler(request, metadata); } - public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) { + public List getPositionByJobID(Integer jobID, DataSourceType type) { if (jobID == null || type == null) { return null; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java index 9a4c324dc1..2c039062f3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java @@ -21,10 +21,12 @@ import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import java.util.List; + /** * IFetchPositionHandler */ public interface IFetchPositionHandler { - RecordPosition handler(FetchPositionRequest request, Metadata metadata); + List handler(FetchPositionRequest request, Metadata metadata); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index 623864fa69..525fe02c0d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -31,6 +31,7 @@ import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.utils.JsonUtils; +import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { } @Override - public RecordPosition handler(FetchPositionRequest request, Metadata metadata) { - EventMeshMysqlPosition position = positionService.getOne(Wrappers.query().eq("jobID", + public List handler(FetchPositionRequest request, Metadata metadata) { + List positionList = positionService.list(Wrappers.query().eq("jobID", request.getJobID())); - RecordPosition recordPosition = null; - if (position != null) { + List recordPositionList = new ArrayList<>(); + for (EventMeshMysqlPosition position : positionList) { + RecordPosition recordPosition = new RecordPosition(); CanalRecordPartition partition = new CanalRecordPartition(); partition.setTimeStamp(position.getTimestamp()); partition.setJournalName(position.getJournalName()); + recordPosition.setRecordPartition(partition); CanalRecordOffset offset = new CanalRecordOffset(); offset.setOffset(position.getPosition()); - recordPosition = new RecordPosition(); - recordPosition.setRecordPartition(partition); recordPosition.setRecordOffset(offset); + recordPositionList.add(recordPosition); } - return recordPosition; + return recordPositionList; } } diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/src/main/resources/application.yaml index ce396a09fa..54795057cb 100644 --- a/eventmesh-admin-server/src/main/resources/application.yaml +++ b/eventmesh-admin-server/src/main/resources/application.yaml @@ -18,8 +18,8 @@ spring: datasource: url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true - username: root - password: mike920830 + username: //db_username + password: //db_password driver-class-name: com.mysql.cj.jdbc.Driver mybatis-plus: mapper-locations: classpath:mapper/*.xml diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index f7a697625c..85484b2ce9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -27,15 +27,20 @@ @EqualsAndHashCode(callSuper = true) public class CanalSinkConfig extends SinkConfig { - private Integer batchSize = 50; // batchSize + // batchSize + private Integer batchSize = 50; - private Boolean useBatch = true; // enable batch + // enable batch + private Boolean useBatch = true; - private Integer poolSize = 5; // sink thread size for single channel + // sink thread size for single channel + private Integer poolSize = 5; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private Boolean skipException = false; // skip sink process exception + // skip sink process exception + private Boolean skipException = false; public SinkConnectorConfig sinkConnectorConfig; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index e5edc5a78e..d75ceb6b58 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -49,32 +49,32 @@ public class CanalSourceConfig extends SourceConfig { private Long batchTimeout = -1L; + private String tableFilter; + + private String fieldFilter; + private List recordPositions; // ================================= channel parameter // ================================ - private Boolean enableRemedy = false; // enable remedy + // enable remedy + private Boolean enableRemedy = false; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private SyncConsistency syncConsistency; // sync consistency + // sync consistency + private SyncConsistency syncConsistency; // ================================= system parameter // ================================ - private String systemSchema; // Default is retl - - private String systemMarkTable; // Bidirectional synchronization mark table - - private String systemMarkTableColumn; // Column name of the bidirectional synchronization mark - - private String systemMarkTableInfo; - // nfo information of the bidirectional synchronization mark, similar to BI_SYNC - - private String systemBufferTable; // sync buffer table + // Column name of the bidirectional synchronization mark + private String needSyncMarkTableColumnName = "needSync"; - private String systemDualTable; // sync heartbeat table + // Column value of the bidirectional synchronization mark + private String needSyncMarkTableColumnValue = "needSync"; private SourceConnectorConfig sourceConnectorConfig; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 137e49bdcc..a51cb32b9c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.common.remote.job.JobTransportType; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; import java.util.Map; import lombok.Data; @@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse { private String sinkConnectorDesc; - private RecordPosition position; + private List position; private JobState state; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java index e9a7a38289..613623d654 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java @@ -20,6 +20,8 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import java.util.List; + import lombok.Data; import lombok.EqualsAndHashCode; @@ -27,7 +29,7 @@ @EqualsAndHashCode(callSuper = true) public class FetchPositionResponse extends BaseRemoteResponse { - private RecordPosition recordPosition; + private List recordPosition; public static FetchPositionResponse successResponse() { FetchPositionResponse response = new FetchPositionResponse(); @@ -36,7 +38,7 @@ public static FetchPositionResponse successResponse() { return response; } - public static FetchPositionResponse successResponse(RecordPosition recordPosition) { + public static FetchPositionResponse successResponse(List recordPosition) { FetchPositionResponse response = successResponse(); response.setRecordPosition(recordPosition); return response; diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle b/eventmesh-connectors/eventmesh-connector-canal/build.gradle index 08623edf15..640cb5ce42 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle @@ -25,8 +25,7 @@ dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") implementation project(":eventmesh-common") implementation canal - implementation "com.alibaba:druid:1.2.23" -// implementation "org.apache.ddlutils:ddlutils:1.0" + implementation "com.alibaba:druid:1.2.6" compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation "org.mockito:mockito-core" diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 32bb79b54e..acd491ba64 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -19,29 +19,17 @@ import org.apache.eventmesh.connector.canal.template.MysqlSqlTemplate; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.support.lob.LobHandler; public class MysqlDialect extends AbstractDbDialect { - private Map, String> shardColumns; - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) { super(jdbcTemplate, lobHandler); sqlTemplate = new MysqlSqlTemplate(); } - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String name, String databaseVersion, - int majorVersion, int minorVersion) { - super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion); - sqlTemplate = new MysqlSqlTemplate(); - } - public boolean isCharSpacePadded() { return false; } @@ -66,16 +54,8 @@ public boolean isDRDS() { return false; } - public String getShardColumns(String schema, String table) { - if (isDRDS()) { - return shardColumns.get(Arrays.asList(schema, table)); - } else { - return null; - } - } - public String getDefaultCatalog() { - return (String) jdbcTemplate.queryForObject("select database()", String.class); + return jdbcTemplate.queryForObject("select database()", String.class); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index ab0776c17d..24d6b42f8b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -28,11 +28,16 @@ import org.springframework.util.CollectionUtils; +import lombok.Getter; +import lombok.Setter; + /** * compute latest sql */ public class SqlBuilderLoadInterceptor { + @Getter + @Setter private DbDialect dbDialect; public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { @@ -128,12 +133,4 @@ private String[] buildColumnNames(List columns1, List } return result; } - - public DbDialect getDbDialect() { - return dbDialect; - } - - public void setDbDialect(DbDialect dbDialect) { - this.dbDialect = dbDialect; - } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 3031a15df0..32c55ec42c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -23,9 +23,10 @@ import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable; import org.apache.eventmesh.connector.canal.model.EventType; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,32 +47,26 @@ @Slf4j public class EntryParser { - public List parse(CanalSourceConfig sourceConfig, List datas) { + public Map> parse(CanalSourceConfig sourceConfig, List datas) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); + // need check weather the entry is loopback + boolean needSync; + Map> recordMap = new HashMap<>(); try { for (Entry entry : datas) { switch (entry.getEntryType()) { - case TRANSACTIONBEGIN: - break; case ROWDATA: - transactionDataBuffer.add(entry); + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0)); + if (needSync) { + transactionDataBuffer.add(entry); + } break; case TRANSACTIONEND: - for (Entry bufferEntry : transactionDataBuffer) { - List recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { - continue; - } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + if (!recordList.isEmpty()) { + recordMap.put(entry.getHeader().getLogfileOffset(), recordList); } transactionDataBuffer.clear(); break; @@ -79,27 +74,46 @@ public List parse(CanalSourceConfig sourceConfig, List recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { + private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List recordList, + List transactionDataBuffer) { + for (Entry bufferEntry : transactionDataBuffer) { + List recordParsedList = internParse(sourceConfig, bufferEntry); + if (CollectionUtils.isEmpty(recordParsedList)) { + continue; + } + long totalSize = bufferEntry.getHeader().getEventLength(); + long eachSize = totalSize / recordParsedList.size(); + for (CanalConnectRecord record : recordParsedList) { + if (record == null) { continue; } + record.setSize(eachSize); + recordList.add(record); + } + } + } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } + private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { + Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); + if (markedColumn != null) { + return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue()); + } + return false; + } + + private Column getColumnIgnoreCase(List columns, String columName) { + for (Column column : columns) { + if (column.getName().equalsIgnoreCase(columName)) { + return column; } - } catch (Exception e) { - throw new RuntimeException(e); } - return recordList; + return null; } private List internParse(CanalSourceConfig sourceConfig, Entry entry) { @@ -127,20 +141,9 @@ private List internParse(CanalSourceConfig sourceConfig, Ent return null; } - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), schemaName)) { - // do noting - if (eventType.isDdl()) { - return null; - } - - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) { - return null; - } - } else { - if (eventType.isDdl()) { - log.warn("unsupported ddl event type: {}", eventType); - return null; - } + if (eventType.isDdl()) { + log.warn("unsupported ddl event type: {}", eventType); + return null; } List recordList = new ArrayList<>(); @@ -164,13 +167,12 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr List beforeColumns = rowData.getBeforeColumnsList(); List afterColumns = rowData.getAfterColumnsList(); - String tableName = canalConnectRecord.getSchemaName() + "." + canalConnectRecord.getTableName(); boolean isRowMode = canalSourceConfig.getSyncMode().isRow(); - Map keyColumns = new LinkedHashMap(); - Map oldKeyColumns = new LinkedHashMap(); - Map notKeyColumns = new LinkedHashMap(); + Map keyColumns = new LinkedHashMap<>(); + Map oldKeyColumns = new LinkedHashMap<>(); + Map notKeyColumns = new LinkedHashMap<>(); if (eventType.isInsert()) { for (Column column : afterColumns) { @@ -195,7 +197,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr keyColumns.put(column.getName(), copyEventColumn(column, true)); } else { if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) { - notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode)); + notKeyColumns.put(column.getName(), copyEventColumn(column, true)); } } } @@ -233,7 +235,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr } canalConnectRecord.setColumns(columns); } else { - throw new RuntimeException("this row data has no pks , entry: " + entry.toString() + " and rowData: " + throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: " + rowData); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index c179124ce4..577142e00c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -31,16 +31,18 @@ import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.commons.lang3.StringUtils; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; - import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager; @@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source, ConnectorCreateService poll() { EntryParser entryParser = new EntryParser(); List result = new ArrayList<>(); - - List connectRecordList = entryParser.parse(sourceConfig, entries); - - if (connectRecordList != null && !connectRecordList.isEmpty()) { - CanalConnectRecord lastRecord = connectRecordList.get(connectRecordList.size() - 1); - - CanalRecordPartition canalRecordPartition = new CanalRecordPartition(); - canalRecordPartition.setJournalName(lastRecord.getJournalName()); - canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime()); - - CanalRecordOffset canalRecordOffset = new CanalRecordOffset(); - canalRecordOffset.setOffset(lastRecord.getBinLogOffset()); - - ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); - connectRecord.addExtension("messageId", String.valueOf(message.getId())); - connectRecord.setData(connectRecordList); - result.add(connectRecord); + // key: Xid offset + Map> connectorRecordMap = entryParser.parse(sourceConfig, entries); + + if (!connectorRecordMap.isEmpty()) { + Set>> entrySet = connectorRecordMap.entrySet(); + for (Map.Entry> entry : entrySet) { + // Xid offset + Long binLogOffset = entry.getKey(); + List connectRecordList = entry.getValue(); + CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1); + CanalRecordPartition canalRecordPartition = new CanalRecordPartition(); + canalRecordPartition.setJournalName(lastRecord.getJournalName()); + canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime()); + + CanalRecordOffset canalRecordOffset = new CanalRecordOffset(); + canalRecordOffset.setOffset(binLogOffset); + + ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); + connectRecord.addExtension("messageId", String.valueOf(message.getId())); + connectRecord.setData(connectRecordList); + result.add(connectRecord); + } + } else { + // for the message has been filtered need ack message + canalServer.ack(clientIdentity, message.getId()); } return result; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java index a169ed20f1..37b45c746f 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java @@ -47,7 +47,7 @@ public String getMergeSql(String schemaName, String tableName, String[] pkNames, size = columnNames.length; for (int i = 0; i < size; i++) { - if (!includePks && shardColumn != null && columnNames[i].equals(shardColumn)) { + if (!includePks && columnNames[i].equals(shardColumn)) { continue; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java index b764b4a989..b4cad1426d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java @@ -57,6 +57,9 @@ void setUp() throws Exception { connector.init(sourceConfig); connector.start(); + // Add delay to ensure the server is fully started before the tests begin + Thread.sleep(2000); + url = new URL("http", "127.0.0.1", config.getPort(), config.getPath()).toString(); httpClient = new OkHttpClient(); } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java index 76800d9c2f..55c88ce55a 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java @@ -18,8 +18,11 @@ package org.apache.eventmesh.openconnect.api.connector; import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; +import java.util.List; + import lombok.Data; /** @@ -32,4 +35,7 @@ public class SourceConnectorContext implements ConnectorContext { public SourceConfig sourceConfig; + // initial record position + public List recordPositionList; + } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index c011a1520c..c784069f97 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -106,6 +106,7 @@ public void persist() { ReportPositionRequest reportPositionRequest = new ReportPositionRequest(); reportPositionRequest.setJobID(jobId); reportPositionRequest.setState(jobState); + reportPositionRequest.setDataSourceType(dataSourceType); reportPositionRequest.setAddress(IPUtils.getLocalAddress()); reportPositionRequest.setRecordPositionList(recordToSyncList); @@ -119,6 +120,10 @@ public void persist() { .build()) .build(); requestObserver.onNext(payload); + + for (Map.Entry entry : recordMap.entrySet()) { + positionStore.remove(entry.getKey()); + } } @Override @@ -157,8 +162,9 @@ public Map getPositionMap() { JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), - fetchPositionResponse.getRecordPosition().getRecordOffset()); + for (RecordPosition recordPosition : fetchPositionResponse.getRecordPosition()) { + positionStore.put(recordPosition.getRecordPartition(), recordPosition.getRecordOffset()); + } } } } @@ -175,9 +181,9 @@ public RecordOffset getPosition(RecordPartition partition) { fetchPositionRequest.setJobID(jobId); fetchPositionRequest.setAddress(IPUtils.getLocalAddress()); fetchPositionRequest.setDataSourceType(dataSourceType); - RecordPosition recordPosition = new RecordPosition(); - recordPosition.setRecordPartition(partition); - fetchPositionRequest.setRecordPosition(recordPosition); + RecordPosition fetchRecordPosition = new RecordPosition(); + fetchRecordPosition.setRecordPartition(partition); + fetchPositionRequest.setRecordPosition(fetchRecordPosition); Metadata metadata = Metadata.newBuilder() .setType(FetchPositionRequest.class.getSimpleName()) @@ -195,8 +201,9 @@ public RecordOffset getPosition(RecordPartition partition) { JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchPositionResponse.class); assert fetchPositionResponse != null; if (fetchPositionResponse.isSuccess()) { - positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(), - fetchPositionResponse.getRecordPosition().getRecordOffset()); + for (RecordPosition recordPosition : fetchPositionResponse.getRecordPosition()) { + positionStore.put(recordPosition.getRecordPartition(), recordPosition.getRecordOffset()); + } } } } diff --git a/eventmesh-registry/eventmesh-registry-nacos/gradle.properties b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties new file mode 100644 index 0000000000..cf067e20bf --- /dev/null +++ b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pluginType=registryCenter +pluginName=nacos \ No newline at end of file diff --git a/eventmesh-runtime-v2/bin/start-v2.sh b/eventmesh-runtime-v2/bin/start-v2.sh new file mode 100644 index 0000000000..fc67c29d3e --- /dev/null +++ b/eventmesh-runtime-v2/bin/start-v2.sh @@ -0,0 +1,200 @@ +#!/bin/bash +# +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +# Server configuration may be inconsistent, add these configurations to avoid garbled code problems +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/customize/your/java/home/here" + +# Detect operating system. +OS=$(uname) + +function is_java8_or_11 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2 + return 0 +} + +function extract_java_version { + local _java="$1" + local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}') + echo "$version" +} + +# 0(not running), 1(is running) +#function is_proxyRunning { +# local _pid="$1" +# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid` +# if [ -z "$pid" ] ; then +# return 0 +# else +# return 1 +# fi +#} + +function get_pid { + local ppid="" + if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then + ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. + if [ ! -d /proc/$ppid ]; then + # Remove the residual file. + rm ${EVENTMESH_HOME}/bin/pid.file + echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output." + ppid="" + fi + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + if [ $DOCKER ]; then + # No need to exclude root user in Docker containers. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_HOME | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | awk -F ' ' {'print $2'}) + else + # It is required to identify the process as accurately as possible on Linux. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_HOME | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" | awk -F ' ' {'print $2'}) + fi + fi + fi + echo "$ppid"; +} + +#=========================================================================================== +# Locate Java Executable +#=========================================================================================== + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" + JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java") +elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" + JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java") +elif is_java8_or_11 "$(which java)"; then + JAVA="$(which java)" + JAVA_VERSION=$(extract_java_version "$(which java)") +else + echo -e "ERROR\t Java 8 or 11 not found, operation abort." + exit 9; +fi + +echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA" + +EVENTMESH_HOME=$(cd "$(dirname "$0")/.." && pwd) +export EVENTMESH_HOME + +EVENTMESH_LOG_HOME="${EVENTMESH_HOME}/logs" +export EVENTMESH_LOG_HOME + +echo -e "EVENTMESH_HOME : ${EVENTMESH_HOME}\nEVENTMESH_LOG_HOME : ${EVENTMESH_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${EVENTMESH_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_LOG_HOME}"; fi +} + +error_exit () +{ + echo -e "ERROR\t $1 !!" + exit 1 +} + +export JAVA_HOME + +#=========================================================================================== +# JVM Configuration +#=========================================================================================== +#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" +#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4" +#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4" +#fi + +GC_LOG_FILE="${EVENTMESH_LOG_HOME}/eventmesh_gc_%p.log" + +#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" +JAVA_OPT=`cat ${EVENTMESH_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc" +if [[ "$JAVA_VERSION" == "8" ]]; then + # Set JAVA_OPT for Java 8 + JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" + JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +elif [[ "$JAVA_VERSION" == "11" ]]; then + # Set JAVA_OPT for Java 11 + XLOG_PARAM="time,level,tags:filecount=5,filesize=30m" + JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}" + JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}" +fi +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_LOG_HOME} -XX:ErrorFile=${EVENTMESH_LOG_HOME}/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_HOME}/conf/log4j2.xml" +JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" +JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_HOME}/plugin" + +#if [ -f "pid.file" ]; then +# pid=`cat pid.file` +# if ! is_proxyRunning "$pid"; then +# echo "proxy is running already" +# exit 9; +# else +# echo "err pid$pid, rm pid.file" +# rm pid.file +# fi +#fi + +pid=$(get_pid) +if [[ $pid == "ERROR"* ]]; then + echo -e "${pid}" + exit 9 +fi +if [ -n "$pid" ]; then + echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9 +fi + +make_logs_dir + +echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_LOG_HOME}/eventmesh.out + +EVENTMESH_MAIN=org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter +if [ $DOCKER ]; then + $JAVA $JAVA_OPT -classpath ${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/* $EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out +else + $JAVA $JAVA_OPT -classpath ${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/* $EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out 2>&1 & +echo $!>${EVENTMESH_HOME}/bin/pid.file +fi +exit 0 diff --git a/eventmesh-runtime-v2/bin/stop-v2.sh b/eventmesh-runtime-v2/bin/stop-v2.sh new file mode 100644 index 0000000000..177ae1e129 --- /dev/null +++ b/eventmesh-runtime-v2/bin/stop-v2.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Detect operating system +OS=$(uname) + +EVENTMESH_HOME=`cd $(dirname $0)/.. && pwd` + +export EVENTMESH_HOME + +function get_pid { + local ppid="" + if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then + ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. + if [ ! -d /proc/$ppid ]; then + # Remove the residual file and return an error status. + rm ${EVENTMESH_HOME}/bin/pid.file + echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output." + ppid="" + fi + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + # It is required to identify the process as accurately as possible on Linux + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_HOME | grep -i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + +pid=$(get_pid) +if [[ $pid == "ERROR"* ]]; then + echo -e "${pid}" + exit 9 +fi +if [ -z "$pid" ];then + echo -e "ERROR\t No EventMesh server running." + exit 9 +fi + +kill ${pid} +echo "Send shutdown request to EventMesh(${pid}) OK" + +[[ $OS =~ Msys ]] && PS_PARAM=" -W " +stop_timeout=60 +for no in $(seq 1 $stop_timeout); do + if ps $PS_PARAM -p "$pid" 2>&1 > /dev/null; then + if [ $no -lt $stop_timeout ]; then + echo "[$no] server shutting down ..." + sleep 1 + continue + fi + + echo "shutdown server timeout, kill process: $pid" + kill -9 $pid; sleep 1; break; + echo "`date +'%Y-%m-%-d %H:%M:%S'` , pid : [$pid] , error message : abnormal shutdown which can not be closed within 60s" > ../logs/shutdown.error + else + echo "shutdown server ok!"; break; + fi +done + +if [ -f "pid.file" ]; then + rm pid.file +fi + + diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 2f16834b4e..65676903dd 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -53,6 +53,8 @@ import org.apache.eventmesh.runtime.RuntimeInstanceConfig; import org.apache.eventmesh.spi.EventMeshExtensionFactory; +import org.apache.commons.collections4.CollectionUtils; + import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -201,6 +203,9 @@ private void initConnectorService() throws Exception { SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); sourceConnectorContext.setSourceConfig(sourceConfig); sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); + if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { + sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); + } // spi load offsetMgmtService this.offsetManagement = new RecordOffsetManagement();