Skip to content

Commit

Permalink
apply filters under http processor
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Nov 23, 2023
1 parent a8e00a8 commit 1bc96bd
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum EventMeshRetCode {
EVENTMESH_HEARTBEAT_ERR(21, "eventMesh heartbeat error"),
EVENTMESH_ACL_ERR(22, "eventMesh acl error"),
EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(23, "eventMesh http msg send over the limit"),

EVENTMESH_FILTER_MSG_ERR(24, "eventMesh filter async msg error"),
EVENTMESH_OPERATE_FAIL(100, "operate fail");

private final Integer retCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void registerMetadata(Map<String, String> metadataMap) {
}

@Override
public List<Map<String, String>> getMetaData(String key, boolean fuzzyEnabled) {
public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void registerMetadata(Map<String, String> metadataMap) {
}

@Override
public List<Map<String, String>> getMetaData(String key, boolean fuzzyEnabled) {
public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void registerMetadata(Map<String, String> metadataMap) {
}

@Override
public List<Map<String, String>> getMetaData(String key, boolean fuzzyEnabled) {
public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ public RateLimiter getBatchRateLimiter() {
return batchRateLimiter;
}

public FilterEngine getFilterEngine() {
return filterEngine;
}

public MetaStorage getMetaStorage() {
return metaStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.filter.patternbuild.PatternBuilder;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
Expand All @@ -39,12 +40,15 @@

import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FilterEngine {

/**
* key:group-topic
**/
public Map<String, Pattern> filterPatternMap = new HashMap<>();
private final Map<String, Pattern> filterPatternMap = new HashMap<>();

private final String FILTER_PREIX = "filter-" ;

Expand Down Expand Up @@ -82,6 +86,7 @@ public void init() {
for (String filterKey : filterPatternMap.keySet()) {
if (!StringUtils.contains(filterKey, producerGroup)) {
addFilterListener(producerGroup);
LogUtils.info(log, "addFilterListener for producer group: " + producerGroup);
}
}
}
Expand All @@ -90,6 +95,7 @@ public void init() {
for (String filterKey : filterPatternMap.keySet()) {
if (!StringUtils.contains(filterKey, consumerGroup)) {
addFilterListener(consumerGroup);
LogUtils.info(log, "addFilterListener for consumer group: " + consumerGroup);
}
}
}
Expand Down Expand Up @@ -124,4 +130,8 @@ public void addFilterListener(String group) {
public void shutdown() {
scheduledExecutorService.shutdown();
}

public Pattern getFilterPattern(String key) {
return filterPatternMap.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
Expand Down Expand Up @@ -158,6 +160,8 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
event.getExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey())).toString();
final String topic = event.getSubject();

Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);

// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
|| event.getData() == null) {
Expand Down Expand Up @@ -237,41 +241,52 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsg();

final long startTime = System.currentTimeMillis();

boolean isFiltered = true;
try {
event = CloudEventBuilder.from(sendMessageContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
if (filterPattern != null) {
isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
}

eventMeshProducer.send(sendMessageContext, new SendCallback() {

@Override
public void onSuccess(final SendResult sendResult) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);

LogUtils.info(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
}
if (isFiltered) {
eventMeshProducer.send(sendMessageContext, new SendCallback() {

@Override
public void onSuccess(final SendResult sendResult) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);

LogUtils.info(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
}

@Override
public void onException(final OnExceptionContext context) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));

handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
}
});
} else {
LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}|apply filter failed",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_FILTER_MSG_ERR, responseHeaderMap, responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
}

@Override
public void onException(final OnExceptionContext context) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));

handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
}
});
} catch (Exception ex) {
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, responseHeaderMap, responseBodyMap, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -126,6 +127,16 @@ public void tryHTTPRequest() {
.withExtension(EventMeshConstants.RSP_URL, currPushUrl)
.withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
.build();

Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (filterPattern != null) {
if (!filterPattern.filter(JsonUtils.toJSONString(event))) {
LOGGER.error("apply filter failed, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
return;
}
}
handleMsgContext.setEvent(event);

String content = "";
Expand Down

0 comments on commit 1bc96bd

Please sign in to comment.