Skip to content

Commit

Permalink
[ISSUE apache#630] RocketMQProducerImpl cannot load config properties…
Browse files Browse the repository at this point in the history
… from classpath (apache#631)

* Fix RocketMqConsumer cannot load properties in classpath
close apache#630
  • Loading branch information
ruanwenjun authored Dec 13, 2021
1 parent d2370d1 commit 3d48165
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ dependencies {
testImplementation "org.powermock:powermock-api-mockito2"

testImplementation rocketmq

compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'

testCompileOnly 'org.projectlombok:lombok:1.18.22'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.eventmesh.connector.rocketmq.admin.command;

import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import java.io.File;
import java.util.UUID;

public abstract class Command {
Expand All @@ -36,12 +33,7 @@ public abstract class Command {
protected String clusterName;

public void init() {
ConfigurationWrapper configurationWrapper =
new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME
+ File.separator
+ EventMeshConstants.EVENTMESH_CONF_FILE, false);
final ClientConfiguration clientConfiguration =
new ClientConfiguration(configurationWrapper);
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.init();

nameServerAddr = clientConfiguration.namesrvAddr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,126 +17,137 @@

package org.apache.eventmesh.connector.rocketmq.config;

import com.google.common.base.Preconditions;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;

public class ClientConfiguration {

public String namesrvAddr = "";
public String clientUserName = "username";
public String clientPass = "password";
public Integer consumeThreadMin = 2;
public Integer consumeThreadMax = 2;
public Integer consumeQueueSize = 10000;
public Integer pullBatchSize = 32;
public Integer ackWindow = 1000;
public Integer pubWindow = 100;
public long consumeTimeout = 0L;
public Integer pollNameServerInterval = 10 * 1000;
public String namesrvAddr = "";
public String clientUserName = "username";
public String clientPass = "password";
public Integer consumeThreadMin = 2;
public Integer consumeThreadMax = 2;
public Integer consumeQueueSize = 10000;
public Integer pullBatchSize = 32;
public Integer ackWindow = 1000;
public Integer pubWindow = 100;
public long consumeTimeout = 0L;
public Integer pollNameServerInterval = 10 * 1000;
public Integer heartbeatBrokerInterval = 30 * 1000;
public Integer rebalanceInterval = 20 * 1000;
public String clusterName = "";
public String accessKey = "";
public String secretKey = "";

protected ConfigurationWrapper configurationWrapper;

public ClientConfiguration(ConfigurationWrapper configurationWrapper) {
this.configurationWrapper = configurationWrapper;
}
public Integer rebalanceInterval = 20 * 1000;
public String clusterName = "";
public String accessKey = "";
public String secretKey = "";

public void init() {

String clientUserNameStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_USERNAME);
String clientUserNameStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_USERNAME);
if (StringUtils.isNotBlank(clientUserNameStr)) {
clientUserName = StringUtils.trim(clientUserNameStr);
}

String clientPassStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_PASSWORD);
String clientPassStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_PASSWORD);
if (StringUtils.isNotBlank(clientPassStr)) {
clientPass = StringUtils.trim(clientPassStr);
}

String namesrvAddrStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR);
Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR));
String namesrvAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR);
Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR));
namesrvAddr = StringUtils.trim(namesrvAddrStr);

String consumeThreadPoolMinStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN);
String consumeThreadPoolMinStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN);
if (StringUtils.isNotEmpty(consumeThreadPoolMinStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMinStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN));
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMinStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN));
consumeThreadMin = Integer.valueOf(consumeThreadPoolMinStr);
}

String consumeThreadPoolMaxStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX);
String consumeThreadPoolMaxStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX);
if (StringUtils.isNotEmpty(consumeThreadPoolMaxStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMaxStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX));
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMaxStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX));
consumeThreadMax = Integer.valueOf(consumeThreadPoolMaxStr);
}

String consumerThreadPoolQueueSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE);
String consumerThreadPoolQueueSizeStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE);
if (StringUtils.isNotEmpty(consumerThreadPoolQueueSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumerThreadPoolQueueSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE));
Preconditions.checkState(StringUtils.isNumeric(consumerThreadPoolQueueSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE));
consumeQueueSize = Integer.valueOf(consumerThreadPoolQueueSizeStr);
}

String clientAckWindowStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW);
String clientAckWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW);
if (StringUtils.isNotEmpty(clientAckWindowStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientAckWindowStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW));
Preconditions.checkState(StringUtils.isNumeric(clientAckWindowStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW));
ackWindow = Integer.valueOf(clientAckWindowStr);
}

String clientPubWindowStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW);
String clientPubWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW);
if (StringUtils.isNotEmpty(clientPubWindowStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPubWindowStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW));
Preconditions.checkState(StringUtils.isNumeric(clientPubWindowStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW));
pubWindow = Integer.valueOf(clientPubWindowStr);
}

String consumeTimeoutStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT);
String consumeTimeoutStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT);
if (StringUtils.isNotBlank(consumeTimeoutStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeTimeoutStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT));
Preconditions.checkState(StringUtils.isNumeric(consumeTimeoutStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT));
consumeTimeout = Long.valueOf(consumeTimeoutStr);
}

String clientPullBatchSizeStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE);
String clientPullBatchSizeStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE);
if (StringUtils.isNotEmpty(clientPullBatchSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPullBatchSizeStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE));
Preconditions.checkState(StringUtils.isNumeric(clientPullBatchSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE));
pullBatchSize = Integer.valueOf(clientPullBatchSizeStr);
}

String clientPollNamesrvIntervalStr =
configurationWrapper.getProp(
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr);
}

String clientHeartbeatBrokerIntervalStr =
configurationWrapper.getProp(
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr);
}

String clientRebalanceIntervalIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL);
String clientRebalanceIntervalIntervalStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL);
if (StringUtils.isNotEmpty(clientRebalanceIntervalIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr);
}
String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER);

String cluster = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER);
if (StringUtils.isNotBlank(cluster)) {
clusterName = cluster;
}

String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY);
String ak = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY);
if (StringUtils.isNotBlank(ak)) {
accessKey = ak;
}

String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY);
String sk = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY);
if (StringUtils.isNotBlank(sk)) {
secretKey = sk;
}
Expand All @@ -150,32 +161,40 @@ static class ConfKeys {

public static String KEYS_EVENTMESH_ROCKETMQ_PASSWORD = "eventMesh.server.rocketmq.password";

public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN = "eventMesh.server.rocketmq.client.consumeThreadMin";
public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN =
"eventMesh.server.rocketmq.client.consumeThreadMin";

public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX = "eventMesh.server.rocketmq.client.consumeThreadMax";
public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX =
"eventMesh.server.rocketmq.client.consumeThreadMax";

public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE = "eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize";
public static String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE =
"eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW = "eventMesh.server.rocketmq.client.ackwindow";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW = "eventMesh.server.rocketmq.client.pubwindow";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT = "eventMesh.server.rocketmq.client.comsumeTimeoutInMin";
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT =
"eventMesh.server.rocketmq.client.comsumeTimeoutInMin";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE =
"eventMesh.server.rocketmq.client.pullBatchSize";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE = "eventMesh.server.rocketmq.client.pullBatchSize";
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL =
"eventMesh.server.rocketmq.client.pollNameServerInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL = "eventMesh.server.rocketmq.client.pollNameServerInterval";
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL =
"eventMesh.server.rocketmq.client.heartbeatBrokerInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval";
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL =
"eventMesh.server.rocketmq.client.rebalanceInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval";

public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster";

public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY =
"eventMesh.server.rocketmq.accessKey";

public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY =
public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY =
"eventMesh.server.rocketmq.secretKey";

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,48 @@

package org.apache.eventmesh.connector.rocketmq.config;

import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@UtilityClass
public class ConfigurationWrapper {

public Logger logger = LoggerFactory.getLogger(this.getClass());

private String file;

private Properties properties = new Properties();

private boolean reload = true;

private ScheduledExecutorService configLoader = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-configloader-");
private static final Properties properties = new Properties();

public ConfigurationWrapper(String file, boolean reload) {
this.file = file;
this.reload = reload;
init();
}

private void init() {
load();
if (this.reload) {
configLoader.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
load();
}
}, 30 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
}
}

private void load() {
static {
String configFile = getConfigFilePath();
log.info("loading config: {}", configFile);
try {
logger.info("loading config: {}", file);
properties.load(new BufferedReader(new FileReader(
new File(file))));
properties.load(new BufferedReader(new FileReader(configFile)));
} catch (IOException e) {
logger.error("loading properties [{}] error", file, e);
throw new IllegalArgumentException(
String.format("Cannot load RocketMQ configuration file from :%s", configFile));
}
}

public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}

private static String getConfigFilePath() {
// get from classpath
URL resource = ConfigurationWrapper.class.getClassLoader().getResource(EventMeshConstants.EVENTMESH_CONF_FILE);
if (resource != null && new File(resource.getPath()).exists()) {
return resource.getPath();
}
// get from config home
return EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
}
}
Loading

0 comments on commit 3d48165

Please sign in to comment.