From 3d48165f6bc04dbce0350048c54c64f83447e157 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 13 Dec 2021 20:43:40 +0800 Subject: [PATCH] [ISSUE #630] RocketMQProducerImpl cannot load config properties from classpath (#631) * Fix RocketMqConsumer cannot load properties in classpath close #630 --- .../eventmesh-connector-rocketmq/build.gradle | 6 + .../rocketmq/admin/command/Command.java | 10 +- .../rocketmq/config/ClientConfiguration.java | 141 ++++++++++-------- .../rocketmq/config/ConfigurationWrapper.java | 64 ++++---- .../consumer/RocketMQConsumerImpl.java | 26 +--- .../rocketmq/producer/ProducerImpl.java | 41 +++-- .../producer/RocketMQProducerImpl.java | 28 +--- .../config/ConfigurationWrapperTest.java | 30 ++++ .../rocketmq/producer/ProducerImplTest.java | 30 +--- .../test/resources/rocketmq-client.properties | 18 +++ eventmesh-runtime/build.gradle | 2 - style/checkStyle.xml | 20 +-- 12 files changed, 197 insertions(+), 219 deletions(-) create mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapperTest.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/rocketmq-client.properties diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle index eb722cc890..e7b807b984 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle @@ -49,4 +49,10 @@ dependencies { testImplementation "org.powermock:powermock-api-mockito2" testImplementation rocketmq + + compileOnly 'org.projectlombok:lombok:1.18.22' + annotationProcessor 'org.projectlombok:lombok:1.18.22' + + testCompileOnly 'org.projectlombok:lombok:1.18.22' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.22' } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java index 82ca6a514e..9f3b5e9e60 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java @@ -17,16 +17,13 @@ package org.apache.eventmesh.connector.rocketmq.admin.command; -import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; -import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import java.io.File; import java.util.UUID; public abstract class Command { @@ -36,12 +33,7 @@ public abstract class Command { protected String clusterName; public void init() { - ConfigurationWrapper configurationWrapper = - new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME - + File.separator - + EventMeshConstants.EVENTMESH_CONF_FILE, false); - final ClientConfiguration clientConfiguration = - new ClientConfiguration(configurationWrapper); + final ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.init(); nameServerAddr = clientConfiguration.namesrvAddr; diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java index feadca6319..20e6d87b61 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java @@ -17,126 +17,137 @@ package org.apache.eventmesh.connector.rocketmq.config; -import com.google.common.base.Preconditions; - import org.apache.commons.lang3.StringUtils; +import com.google.common.base.Preconditions; + public class ClientConfiguration { - public String namesrvAddr = ""; - public String clientUserName = "username"; - public String clientPass = "password"; - public Integer consumeThreadMin = 2; - public Integer consumeThreadMax = 2; - public Integer consumeQueueSize = 10000; - public Integer pullBatchSize = 32; - public Integer ackWindow = 1000; - public Integer pubWindow = 100; - public long consumeTimeout = 0L; - public Integer pollNameServerInterval = 10 * 1000; + public String namesrvAddr = ""; + public String clientUserName = "username"; + public String clientPass = "password"; + public Integer consumeThreadMin = 2; + public Integer consumeThreadMax = 2; + public Integer consumeQueueSize = 10000; + public Integer pullBatchSize = 32; + public Integer ackWindow = 1000; + public Integer pubWindow = 100; + public long consumeTimeout = 0L; + public Integer pollNameServerInterval = 10 * 1000; public Integer heartbeatBrokerInterval = 30 * 1000; - public Integer rebalanceInterval = 20 * 1000; - public String clusterName = ""; - public String accessKey = ""; - public String secretKey = ""; - - protected ConfigurationWrapper configurationWrapper; - - public ClientConfiguration(ConfigurationWrapper configurationWrapper) { - this.configurationWrapper = configurationWrapper; - } + public Integer rebalanceInterval = 20 * 1000; + public String clusterName = ""; + public String accessKey = ""; + public String secretKey = ""; public void init() { - String clientUserNameStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_USERNAME); + String clientUserNameStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_USERNAME); if (StringUtils.isNotBlank(clientUserNameStr)) { clientUserName = StringUtils.trim(clientUserNameStr); } - String clientPassStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_PASSWORD); + String clientPassStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_PASSWORD); if (StringUtils.isNotBlank(clientPassStr)) { clientPass = StringUtils.trim(clientPassStr); } - String namesrvAddrStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR); - Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR)); + String namesrvAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR); + Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR)); namesrvAddr = StringUtils.trim(namesrvAddrStr); - String consumeThreadPoolMinStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN); + String consumeThreadPoolMinStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN); if (StringUtils.isNotEmpty(consumeThreadPoolMinStr)) { - Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMinStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN)); + Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMinStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN)); consumeThreadMin = Integer.valueOf(consumeThreadPoolMinStr); } - String consumeThreadPoolMaxStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX); + String consumeThreadPoolMaxStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX); if (StringUtils.isNotEmpty(consumeThreadPoolMaxStr)) { - Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMaxStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX)); + Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMaxStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX)); consumeThreadMax = Integer.valueOf(consumeThreadPoolMaxStr); } - String consumerThreadPoolQueueSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE); + String consumerThreadPoolQueueSizeStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE); if (StringUtils.isNotEmpty(consumerThreadPoolQueueSizeStr)) { - Preconditions.checkState(StringUtils.isNumeric(consumerThreadPoolQueueSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE)); + Preconditions.checkState(StringUtils.isNumeric(consumerThreadPoolQueueSizeStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE)); consumeQueueSize = Integer.valueOf(consumerThreadPoolQueueSizeStr); } - String clientAckWindowStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW); + String clientAckWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW); if (StringUtils.isNotEmpty(clientAckWindowStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientAckWindowStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW)); + Preconditions.checkState(StringUtils.isNumeric(clientAckWindowStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW)); ackWindow = Integer.valueOf(clientAckWindowStr); } - String clientPubWindowStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW); + String clientPubWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW); if (StringUtils.isNotEmpty(clientPubWindowStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientPubWindowStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW)); + Preconditions.checkState(StringUtils.isNumeric(clientPubWindowStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW)); pubWindow = Integer.valueOf(clientPubWindowStr); } - String consumeTimeoutStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT); + String consumeTimeoutStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT); if (StringUtils.isNotBlank(consumeTimeoutStr)) { - Preconditions.checkState(StringUtils.isNumeric(consumeTimeoutStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT)); + Preconditions.checkState(StringUtils.isNumeric(consumeTimeoutStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT)); consumeTimeout = Long.valueOf(consumeTimeoutStr); } - String clientPullBatchSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE); + String clientPullBatchSizeStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE); if (StringUtils.isNotEmpty(clientPullBatchSizeStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientPullBatchSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE)); + Preconditions.checkState(StringUtils.isNumeric(clientPullBatchSizeStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE)); pullBatchSize = Integer.valueOf(clientPullBatchSizeStr); } String clientPollNamesrvIntervalStr = - configurationWrapper.getProp( + ConfigurationWrapper.getProp( ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); + Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr); } String clientHeartbeatBrokerIntervalStr = - configurationWrapper.getProp( + ConfigurationWrapper.getProp( ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); + Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr); } - String clientRebalanceIntervalIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL); + String clientRebalanceIntervalIntervalStr = + ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL); if (StringUtils.isNotEmpty(clientRebalanceIntervalIntervalStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); + Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), + String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr); } - - String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER); + + String cluster = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER); if (StringUtils.isNotBlank(cluster)) { clusterName = cluster; } - String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY); + String ak = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY); if (StringUtils.isNotBlank(ak)) { accessKey = ak; } - String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY); + String sk = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY); if (StringUtils.isNotBlank(sk)) { secretKey = sk; } @@ -150,32 +161,40 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ROCKETMQ_PASSWORD = "eventMesh.server.rocketmq.password"; - public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN = "eventMesh.server.rocketmq.client.consumeThreadMin"; + public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN = + "eventMesh.server.rocketmq.client.consumeThreadMin"; - public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX = "eventMesh.server.rocketmq.client.consumeThreadMax"; + public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX = + "eventMesh.server.rocketmq.client.consumeThreadMax"; - public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE = "eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize"; + public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE = + "eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize"; public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW = "eventMesh.server.rocketmq.client.ackwindow"; public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW = "eventMesh.server.rocketmq.client.pubwindow"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT = "eventMesh.server.rocketmq.client.comsumeTimeoutInMin"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT = + "eventMesh.server.rocketmq.client.comsumeTimeoutInMin"; + + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE = + "eventMesh.server.rocketmq.client.pullBatchSize"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE = "eventMesh.server.rocketmq.client.pullBatchSize"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL = + "eventMesh.server.rocketmq.client.pollNameServerInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL = "eventMesh.server.rocketmq.client.pollNameServerInterval"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = + "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = + "eventMesh.server.rocketmq.client.rebalanceInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster"; public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY = "eventMesh.server.rocketmq.accessKey"; - public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY = + public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY = "eventMesh.server.rocketmq.secretKey"; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java index 9334b5fc98..9ff3861ad6 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java @@ -17,60 +17,48 @@ package org.apache.eventmesh.connector.rocketmq.config; +import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; + +import org.apache.commons.lang3.StringUtils; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.net.URL; import java.util.Properties; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.ThreadPoolFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +@Slf4j +@UtilityClass public class ConfigurationWrapper { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - - private String file; - - private Properties properties = new Properties(); - - private boolean reload = true; - - private ScheduledExecutorService configLoader = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-configloader-"); + private static final Properties properties = new Properties(); - public ConfigurationWrapper(String file, boolean reload) { - this.file = file; - this.reload = reload; - init(); - } - - private void init() { - load(); - if (this.reload) { - configLoader.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - load(); - } - }, 30 * 1000, 30 * 1000, TimeUnit.MILLISECONDS); - } - } - - private void load() { + static { + String configFile = getConfigFilePath(); + log.info("loading config: {}", configFile); try { - logger.info("loading config: {}", file); - properties.load(new BufferedReader(new FileReader( - new File(file)))); + properties.load(new BufferedReader(new FileReader(configFile))); } catch (IOException e) { - logger.error("loading properties [{}] error", file, e); + throw new IllegalArgumentException( + String.format("Cannot load RocketMQ configuration file from :%s", configFile)); } } public String getProp(String key) { return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null); } + + private static String getConfigFilePath() { + // get from classpath + URL resource = ConfigurationWrapper.class.getClassLoader().getResource(EventMeshConstants.EVENTMESH_CONF_FILE); + if (resource != null && new File(resource.getPath()).exists()) { + return resource.getPath(); + } + // get from config home + return EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE; + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java index 1b3fe482ce..5e4d4d3c95 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java @@ -21,12 +21,10 @@ import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.consumer.Consumer; import org.apache.eventmesh.connector.rocketmq.common.Constants; -import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; -import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; + import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.io.File; import java.util.List; import java.util.Properties; @@ -34,27 +32,23 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class RocketMQConsumerImpl implements Consumer { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - public Logger messageLogger = LoggerFactory.getLogger("message"); private PushConsumerImpl pushConsumer; @Override public synchronized void init(Properties keyValue) throws Exception { - ConfigurationWrapper configurationWrapper = - new ConfigurationWrapper(getRocketMqConfigFile(), false); - final ClientConfiguration clientConfiguration = - new ClientConfiguration(configurationWrapper); + final ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.init(); boolean isBroadcast = Boolean.parseBoolean(keyValue.getProperty("isBroadcast")); String consumerGroup = keyValue.getProperty("consumerGroup"); String instanceName = keyValue.getProperty("instanceName"); - if (isBroadcast) { consumerGroup = Constants.BROADCAST_PREFIX + consumerGroup; } @@ -113,16 +107,4 @@ public Properties attributes() { return pushConsumer.attributes(); } - private String getRocketMqConfigFile() { - // get from classpath - String configFile = RocketMQConsumerImpl.class.getClassLoader() - .getResource(EventMeshConstants.EVENTMESH_CONF_FILE).getPath(); - if (new File(configFile).exists()) { - return configFile; - } - // get from config home - configFile = EventMeshConstants.EVENTMESH_CONF_HOME + File.separator - + EventMeshConstants.EVENTMESH_CONF_FILE; - return configFile; - } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index 619fe63578..675fb4973e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -23,7 +23,6 @@ import org.apache.eventmesh.api.exception.ConnectorRuntimeException; import org.apache.eventmesh.api.exception.OnExceptionContext; import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory; -import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil; import org.apache.eventmesh.connector.rocketmq.utils.CloudEventUtils; import org.apache.commons.lang3.StringUtils; @@ -35,7 +34,6 @@ import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.Properties; @@ -44,13 +42,14 @@ import org.slf4j.LoggerFactory; import io.cloudevents.CloudEvent; +import lombok.extern.slf4j.Slf4j; +@Slf4j +@SuppressWarnings("deprecation") public class ProducerImpl extends AbstractProducer { public static final int eventMeshServerAsyncAccumulationThreshold = 1000; - private final Logger logger = LoggerFactory.getLogger(ProducerImpl.class); - public ProducerImpl(final Properties properties) { super(properties); } @@ -64,8 +63,7 @@ public void setExtFields() { super.getRocketmqProducer().setRetryTimesWhenSendAsyncFailed(0); super.getRocketmqProducer().setPollNameServerInterval(60000); - super.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory() - .getNettyClientConfig() + super.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getNettyClientConfig() .setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold); super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10); } @@ -77,8 +75,7 @@ public SendResult send(CloudEvent cloudEvent) { RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); String messageId = null; try { - org.apache.rocketmq.client.producer.SendResult sendResultRmq = - this.rocketmqProducer.send(msg); + org.apache.rocketmq.client.producer.SendResult sendResultRmq = this.rocketmqProducer.send(msg); SendResult sendResult = new SendResult(); sendResult.setTopic(sendResultRmq.getMessageQueue().getTopic()); messageId = sendResultRmq.getMsgId(); @@ -99,8 +96,7 @@ public void sendOneway(CloudEvent cloudEvent) { this.rocketmqProducer.sendOneway(msg); } catch (Exception e) { log.error(String.format("Send message oneway Exception, %s", msg), e); - throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), - e); + throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e); } } @@ -114,8 +110,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); } catch (Exception e) { log.error(String.format("Send message async Exception, %s", msg), e); - throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), - e); + throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e); } } @@ -134,21 +129,23 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); if (StringUtils.isNotEmpty(cloudEvent.getExtension("cluster").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER, cloudEvent.getExtension("cluster").toString()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER, + cloudEvent.getExtension("cluster").toString()); } if (StringUtils.isNotEmpty(cloudEvent.getExtension("replytoclient").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, cloudEvent.getExtension("replytoclient").toString()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, + cloudEvent.getExtension("replytoclient").toString()); } if (StringUtils.isNotEmpty(cloudEvent.getExtension("correlationid").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, cloudEvent.getExtension("correlationid").toString()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, + cloudEvent.getExtension("correlationid").toString()); } try { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); } catch (Exception e) { log.error(String.format("Send message async Exception, %s", msg), e); - throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), - e); + throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e); } return true; @@ -165,8 +162,7 @@ public void onSuccess(org.apache.rocketmq.common.message.Message message) { @Override public void onException(Throwable e) { String topic = message.getTopic(); - ConnectorRuntimeException onsEx = - ProducerImpl.this.checkProducerException(topic, null, e); + ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e); OnExceptionContext context = new OnExceptionContext(); context.setTopic(topic); context.setException(onsEx); @@ -176,8 +172,8 @@ public void onException(Throwable e) { }; } - private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert( - final Message message, final SendCallback sendCallback) { + private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, + final SendCallback sendCallback) { org.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new org.apache.rocketmq.client.producer.SendCallback() { @Override @@ -188,8 +184,7 @@ public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) @Override public void onException(Throwable e) { String topic = message.getTopic(); - ConnectorRuntimeException onsEx = - ProducerImpl.this.checkProducerException(topic, null, e); + ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e); OnExceptionContext context = new OnExceptionContext(); context.setTopic(topic); context.setException(onsEx); diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java index 8ab3d969e4..47ed2b1d83 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java @@ -23,33 +23,25 @@ import org.apache.eventmesh.api.producer.Producer; import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; -import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.io.File; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.cloudevents.CloudEvent; +import lombok.extern.slf4j.Slf4j; +@Slf4j +@SuppressWarnings("deprecation") public class RocketMQProducerImpl implements Producer { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - private ProducerImpl producer; @Override public synchronized void init(Properties keyValue) { - ConfigurationWrapper configurationWrapper = - new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME - + File.separator - + EventMeshConstants.EVENTMESH_CONF_FILE, false); - final ClientConfiguration clientConfiguration = new ClientConfiguration(configurationWrapper); + final ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.init(); String producerGroup = keyValue.getProperty("producerGroup"); @@ -92,15 +84,10 @@ public void publish(CloudEvent message, SendCallback sendCallback) throws Except @Override public void request(CloudEvent message, RequestReplyCallback rrCallback, long timeout) - throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + throws InterruptedException, RemotingException, MQClientException, MQBrokerException { producer.request(message, rrCallback, timeout); } -// @Override -// public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { -// -// } - @Override public boolean reply(final CloudEvent message, final SendCallback sendCallback) throws Exception { producer.reply(message, sendCallback); @@ -109,7 +96,8 @@ public boolean reply(final CloudEvent message, final SendCallback sendCallback) @Override public void checkTopicExist(String topic) throws Exception { - this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); + this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl() + .getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS); } @Override @@ -133,6 +121,4 @@ public void sendAsync(CloudEvent message, SendCallback sendCallback) { } - - } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapperTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapperTest.java new file mode 100644 index 0000000000..6d25baf7cf --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapperTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.config; + +import org.junit.Assert; +import org.junit.Test; + +public class ConfigurationWrapperTest { + + @Test + public void getProp() { + String namesrcAddr = ConfigurationWrapper.getProp("eventMesh.server.rocketmq.namesrvAddr"); + Assert.assertNotNull(namesrcAddr); + } +} \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java index a761cd13d4..27e0c69f29 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java @@ -61,7 +61,6 @@ public class ProducerImplTest { @Before public void before() throws NoSuchFieldException, IllegalAccessException { Properties config = new Properties(); -// config.setProperty(OMSBuiltinKeys.DRIVER_IMPL, "org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl"); config.setProperty("access_points", "IP1:9876,IP2:9876"); producer = new ProducerImpl(config); @@ -69,17 +68,13 @@ public void before() throws NoSuchFieldException, IllegalAccessException { field.setAccessible(true); field.set(producer, rocketmqProducer); -// messagingAccessPoint.startup(); producer.start(); } - @After - public void after() throws NoSuchFieldException, IllegalAccessException { - + public void after() { producer.shutdown(); - } @Test @@ -114,29 +109,6 @@ public void testSend_OK() throws InterruptedException, RemotingException, MQClie } -// @Test -// public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { -// SendResult sendResult = new SendResult(); -// sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT); -// MessageQueue messageQueue = new MessageQueue("HELLO_TOPIC", "testBroker", 0); -// sendResult.setMessageQueue(messageQueue); -// -// when(rocketmqProducer.send(any(Message.class))).thenReturn(sendResult); -// -// DefaultMQProducer defaultMQProducer =new DefaultMQProducer("testGroup"); -// DefaultMQProducerImpl defaultMQProducerImpl = new DefaultMQProducerImpl(defaultMQProducer); -// defaultMQProducerImpl.setServiceState(ServiceState.RUNNING); -// when(rocketmqProducer.getDefaultMQProducerImpl()).thenReturn(defaultMQProducerImpl); -// -// try { -// io.openmessaging.api.Message message = new io.openmessaging.api.Message("HELLO_TOPIC", "", new byte[] {'a'}); -// producer.send(message); -// failBecauseExceptionWasNotThrown(OMSRuntimeException.class); -// } catch (Exception e) { -// assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); -// } -// } - @Test public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer("testGroup"); diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/rocketmq-client.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/rocketmq-client.properties new file mode 100644 index 0000000000..1261f30e2c --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/rocketmq-client.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#######################rocketmq-client################## +eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876 diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index e4e19031a7..fdca886ccb 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-api") implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-registry-plugin:eventmesh-registry-api") - implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") implementation project(":eventmesh-admin:eventmesh-admin-rocketmq") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") @@ -51,7 +50,6 @@ dependencies { testImplementation project(":eventmesh-security-plugin:eventmesh-security-api") testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl") testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api") - testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") testImplementation project(":eventmesh-admin:eventmesh-admin-rocketmq") testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") diff --git a/style/checkStyle.xml b/style/checkStyle.xml index 17f5d40e7f..410df51a73 100644 --- a/style/checkStyle.xml +++ b/style/checkStyle.xml @@ -73,7 +73,6 @@ - @@ -265,14 +263,13 @@ - - + - - - - - - +