Skip to content

Commit

Permalink
Merge branch 'eventmesh-function' of https://github.com/xwm1992/Event…
Browse files Browse the repository at this point in the history
…Mesh into eventmesh-function
  • Loading branch information
xwm1992 committed May 22, 2024
2 parents 5cd3381 + 0c3bdd7 commit 9f8c71d
Show file tree
Hide file tree
Showing 20 changed files with 114 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
package com.apache.eventmesh.admin.server;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.PagedList;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryFactory;
import org.apache.eventmesh.registry.RegistryService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

public class AdminServer implements Admin {
@Service
@Slf4j
public class AdminServer implements Admin, ApplicationListener<ApplicationReadyEvent> {

private final RegistryService registryService;

private final RegisterServerInfo adminServeInfo;

public AdminServer(RegistryService registryService, AdminServerProperties properties) {
this.registryService = registryService;
private final CommonConfiguration configuration;

public AdminServer(AdminServerProperties properties) {
configuration =
ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class);
if (configuration == null) {
throw new AdminServerException(ErrorCode.STARTUP_CONFIG_MISS, "common configuration file miss");
}
this.adminServeInfo = new RegisterServerInfo();
adminServeInfo.setServiceName(Constants.ADMIN_SERVER_REGISTRY_NAME);

adminServeInfo.setHealth(true);
adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + properties.getPort());
String name = Constants.ADMIN_SERVER_REGISTRY_NAME;
if (StringUtils.isNotBlank(properties.getServiceName())) {
name = properties.getServiceName();
}
adminServeInfo.setServiceName(name);
registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType());
}


Expand Down Expand Up @@ -49,13 +74,24 @@ public void reportHeartbeat(ReportHeartBeatRequest heartBeat) {
}

@Override
@PostConstruct
public void start() {
registryService.register(adminServeInfo);

registryService.init();
}

@Override
public void destroy() {
registryService.unRegister(adminServeInfo);
try {
Thread.sleep(3000);
}catch (InterruptedException ignore){}
registryService.shutdown();
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("application is started, it's will register admin self");
registryService.register(adminServeInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
@Setter
public class AdminServerProperties {
private int port;
private boolean enable;
private boolean enableSSL;
private String configurationPath;
private String configurationFile;
private String serviceName;
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.apache.eventmesh.admin.server;

import com.apache.eventmesh.admin.server.constatns.AdminServerConstants;
import org.apache.eventmesh.common.config.ConfigService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ExampleAdminServer {
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE);
SpringApplication.run(ExampleAdminServer.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.apache.eventmesh.admin.server.constatns;

public class AdminServerConstants {
public static final String CONF_ENV = "configurationPath";

public static final String EVENTMESH_CONF_HOME = System.getProperty(CONF_ENV, System.getenv(CONF_ENV));

public static final String EVENTMESH_CONF_FILE = "eventmesh-admin.properties";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class GrpcServer extends BaseServer {
@Override
public void start() throws Exception {
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(getPort()).addService(adminGrpcServer);
if (properties.isEnable()) {
if (properties.isEnableSSL()) {
serverBuilder.sslContext(null);
}
server = serverBuilder.build();
Expand All @@ -41,7 +41,7 @@ public void destroy() {
server.shutdownNow();
}
}
} catch (Exception e) {
} catch (InterruptedException e) {
log.warn("destroy [{}] server fail", this.getClass().getSimpleName(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ public FetchPositionResponse handler(FetchPositionRequest request, Metadata meta
offset.setOffset(position.getPosition());
RecordPosition recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
recordPosition.setRecordPartitionClazz(partition.getClass());
recordPosition.setRecordOffset(offset);
recordPosition.setRecordOffsetClazz(offset.getClass());
response.setRecordPosition(recordPosition);
}
return response;
Expand Down
1 change: 1 addition & 0 deletions eventmesh-admin-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
event-mesh:
admin-server:
service-name: DEFAULT_GROUP@@em_adm_server
port: 8081
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
eventMesh.server.retry.plugin.type=nacos
eventMesh.registry.plugin.server-addr=localhost:8848
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,5 @@ public class Constants {

public static final String DEFAULT = "default";

public static final String ADMIN_SERVER_REGISTRY_NAME = "em_adm_server@@DEFAULT_GROUP";
public static final String ADMIN_SERVER_REGISTRY_NAME = "DEFAULT_GROUP@@em_adm_server";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.eventmesh.common.config;

import static org.apache.eventmesh.common.utils.ReflectUtils.lookUpFieldByParentClass;

import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.util.Strings;

import java.io.File;
import java.io.IOException;
Expand All @@ -29,9 +29,7 @@
import java.util.Objects;
import java.util.Properties;

import org.assertj.core.util.Strings;

import lombok.Getter;
import static org.apache.eventmesh.common.utils.ReflectUtils.lookUpFieldByParentClass;

public class ConfigService {

Expand Down Expand Up @@ -60,6 +58,9 @@ private ConfigService() {
}

public ConfigService setConfigPath(String configPath) {
if (StringUtils.isNotBlank(configPath) && !configPath.endsWith(File.separator)) {
configPath = configPath + File.separator;
}
this.configPath = configPath;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ public class ErrorCode {
public static final int BAD_DB_DATA = 4002;

public static final int INTERNAL_ERR = 5000;
public static final int STARTUP_CONFIG_MISS = 5001;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.eventmesh.connector.wecom.connector;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

public class MockRecordOffset extends RecordOffset {
@Override
public Class<? extends RecordOffset> getRecordOffsetClass() {
return MockRecordOffset.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.eventmesh.connector.wecom.connector;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

public class MockRecordPartition extends RecordPartition {
@Override
public Class<? extends RecordPartition> getRecordPartitionClass() {
return MockRecordPartition.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.connector.wecom.connector;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.apache.eventmesh.common.config.connector.wecom.WeComSinkConfig;
import org.apache.eventmesh.common.remote.offset.RecordOffset;
import org.apache.eventmesh.common.remote.offset.RecordPartition;
Expand All @@ -31,20 +27,11 @@
import org.apache.eventmesh.connector.wecom.sink.connector.WeComSinkConnector;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -56,6 +43,17 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class WeComSinkConnectorTest {

Expand Down Expand Up @@ -89,8 +87,8 @@ public void testSendMessageToWeCom() throws IOException {
final int times = 3;
List<ConnectRecord> records = new ArrayList<>();
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
RecordPartition partition = new MockRecordPartition();
RecordOffset offset = new MockRecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8));
connectRecord.addExtension(ConnectRecordExtensionKeys.WECOM_MESSAGE_TEMPLATE_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
Long timestamp, Object data) {
this.position = new RecordPosition(recordPartition, recordOffset);
if (recordPartition != null) {
this.position.setRecordPartitionClazz(recordPartition.getRecordPartitionClass());
}
if (recordOffset != null) {
this.position.setRecordOffsetClazz(recordOffset.getRecordOffsetClass());
}
this.timestamp = timestamp;
this.data = data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ private static RegistryService registryBuilder(String registryPluginType) {
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}

log.info("build registry plugin [{}] by type [{}] success", registryServiceExt.getClass().getSimpleName(),
registryPluginType);
return registryServiceExt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
public interface RegistryService {
String ConfigurationKey = "registry";
void init() throws RegistryException;

void shutdown() throws RegistryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.registry.NotifyEvent;
import org.apache.eventmesh.registry.QueryInstances;
import org.apache.eventmesh.registry.RegisterServerInfo;
Expand Down Expand Up @@ -48,8 +44,6 @@ public class NacosDiscoveryService implements RegistryService {

private final AtomicBoolean initFlag = new AtomicBoolean(false);

private CommonConfiguration configuration;

private NacosRegistryConfiguration nacosConf;

private NamingService namingService;
Expand All @@ -73,10 +67,6 @@ public void init() throws RegistryException {
if (!initFlag.compareAndSet(false, true)) {
return;
}
configuration = ConfigurationContextUtil.get(RegistryService.ConfigurationKey);
if (configuration == null) {
throw new RegistryException("registry config instance is null");
}
nacosConf = ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
if (nacosConf == null) {
log.info("nacos registry configuration is null");
Expand All @@ -93,12 +83,13 @@ public void init() throws RegistryException {

private Properties buildProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, configuration.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, configuration.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, configuration.getEventMeshRegistryPluginPassword());
if (nacosConf == null) {
return properties;
}
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConf.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, nacosConf.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, nacosConf.getEventMeshRegistryPluginPassword());

String endpoint = nacosConf.getEndpoint();
if (Objects.nonNull(endpoint) && endpoint.contains(":")) {
int index = endpoint.indexOf(":");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigField;

@Data
@NoArgsConstructor
@Config(prefix = "eventMesh.registry.nacos")
public class NacosRegistryConfiguration {
public class NacosRegistryConfiguration extends CommonConfiguration {

@ConfigField(field = PropertyKeyConst.ENDPOINT)
private String endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

nacos=com.apache.eventmesh.admin.server.registry.NacosDiscoveryService
nacos=org.apache.eventmesh.registry.nacos.NacosDiscoveryService

0 comments on commit 9f8c71d

Please sign in to comment.