Skip to content

Commit

Permalink
[Bug apache#646] Missing the rocketmq message properties during proto…
Browse files Browse the repository at this point in the history
…col conversion (apache#647)

* [Bug apache#646] Missing the rocketmq message properties during protocol conversion

* fix checkstyle and gradle module dependency

* fix conflicts

close apache#646
  • Loading branch information
xwm1992 authored Dec 13, 2021
1 parent 3d48165 commit d501fcb
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ public class Constants {

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp";

public static final String MESSAGE_PROP_SEPARATOR = "99";

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ protected boolean isContentTypeHeader(String key) {

@Override
protected boolean isCloudEventsHeader(String key) {
return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
.startsWith(RocketMQHeaders.CE_PREFIX);
// return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
//.startsWith(RocketMQHeaders.CE_PREFIX);
return true;
}

@Override
protected String toCloudEventsKey(String key) {
return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
//return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
return key.toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class RocketMQHeaders {
public static final String CE_PREFIX = "CE_";

protected static final Map<String, String> ATTRIBUTES_TO_HEADERS =
MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);
MessageUtils.generateAttributesToHeadersMapping(v -> v);

public static final String CONTENT_TYPE =
ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ public RocketMQMessageWriter(String topic, String keys, String tags) {
public CloudEventContextWriter withContextAttribute(String name, String value)
throws CloudEventRWException {

String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
if (propName == null) {
propName = RocketMQHeaders.CE_PREFIX + name;
}
message.putUserProperty(propName, value);
//String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
//if (propName == null) {
//propName = RocketMQHeaders.CE_PREFIX + name;
//}
//message.putUserProperty(propName, value);
message.putUserProperty(name, value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,18 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
String.valueOf(msg.getStoreTimestamp()));

//for rr request/reply
String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

CloudEvent cloudEvent =
RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();

CloudEventBuilder cloudEventBuilder;
if (StringUtils.isNotEmpty(cluster)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster);
cloudEvent = cloudEventBuilder.build();
}
if (StringUtils.isNotEmpty(replyClient)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient);
cloudEvent = cloudEventBuilder.build();
CloudEventBuilder cloudEventBuilder = null;
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) {
String prop = msg.getProperty(sysPropKey);
sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop);
}
}
if (StringUtils.isNotEmpty(correlationId)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId);
if (cloudEventBuilder != null) {
cloudEvent = cloudEventBuilder.build();
}

Expand Down Expand Up @@ -259,25 +253,19 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP,
String.valueOf(msg.getStoreTimestamp()));

//for rr request/reply
String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

CloudEvent cloudEvent =
RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();

CloudEventBuilder cloudEventBuilder;
if (StringUtils.isNotEmpty(cluster)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster);
cloudEvent = cloudEventBuilder.build();
}
if (StringUtils.isNotEmpty(replyClient)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient);
cloudEvent = cloudEventBuilder.build();
CloudEventBuilder cloudEventBuilder = null;

for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) {
String prop = msg.getProperty(sysPropKey);
sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop);
}
}
if (StringUtils.isNotEmpty(correlationId)) {
cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId);
if (cloudEventBuilder != null) {
cloudEvent = cloudEventBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory;
import org.apache.eventmesh.connector.rocketmq.utils.CloudEventUtils;

Expand All @@ -38,9 +39,6 @@

import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.cloudevents.CloudEvent;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -73,6 +71,7 @@ public SendResult send(CloudEvent cloudEvent) {
this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
String messageId = null;
try {
org.apache.rocketmq.client.producer.SendResult sendResultRmq = this.rocketmqProducer.send(msg);
Expand All @@ -92,6 +91,7 @@ public void sendOneway(CloudEvent cloudEvent) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.sendOneway(msg);
} catch (Exception e) {
Expand All @@ -105,7 +105,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);

msg = supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
} catch (Exception e) {
Expand All @@ -120,6 +120,9 @@ public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);

msg = supplySysProp(msg, cloudEvent);

rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout);
}

Expand All @@ -128,18 +131,7 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
if (StringUtils.isNotEmpty(cloudEvent.getExtension("cluster").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER,
cloudEvent.getExtension("cluster").toString());
}
if (StringUtils.isNotEmpty(cloudEvent.getExtension("replytoclient").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT,
cloudEvent.getExtension("replytoclient").toString());
}
if (StringUtils.isNotEmpty(cloudEvent.getExtension("correlationid").toString())) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID,
cloudEvent.getExtension("correlationid").toString());
}
msg = supplySysProp(msg, cloudEvent);

try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
Expand All @@ -151,10 +143,30 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac

}

private Message supplySysProp(Message msg, CloudEvent cloudEvent) {
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) {
MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString());
msg.getProperties().remove(ceKey);
}
}
return msg;
}

private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) {
return new RequestCallback() {
@Override
public void onSuccess(org.apache.rocketmq.common.message.Message message) {
// clean the message property to lowercase
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
String prop = message.getProperty(sysPropKey);
String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
MessageAccessor.putProperty(message, tmpPropKey, prop);
message.getProperties().remove(sysPropKey);
}
}
CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent();
rrCallback.onSuccess(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.eventmesh.connector.rocketmq.utils;


import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQHeaders;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Map;
Expand Down Expand Up @@ -90,13 +90,23 @@ public static Message msgConvert(MessageExt rmqMsg) {
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET),
String.valueOf(rmqMsg.getQueueOffset()));

for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
String prop = message.getProperty(sysPropKey);
String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
MessageAccessor.putProperty(message, tmpPropKey, prop);
message.getProperties().remove(sysPropKey);
}
}

return message;
}



private static String buildCloudEventPropertyKey(String propName) {
return RocketMQHeaders.CE_PREFIX + propName;
//return RocketMQHeaders.CE_PREFIX + propName;
return propName;
}

public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) {
Expand All @@ -121,8 +131,8 @@ public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Messag
rmqMessageExt.setTopic(message.getTopic());

int queueId =
(int) Integer.valueOf(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
long queueOffset = (long) Long.valueOf(
Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
long queueOffset = Long.parseLong(
message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
//use in manual ack
rmqMessageExt.setQueueId(queueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.runtime.constants;

import org.apache.eventmesh.common.Constants;

public class EventMeshConstants {

public static final String EVENT_STORE_PROPERTIES = "eventstore";
Expand Down Expand Up @@ -111,8 +113,10 @@ public class EventMeshConstants {

public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";

public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime
public static final String ARRIVE_TIME = "arrivetime";
public static final String STORE_TIME = "storetime";
public static final String LEAVE_TIME = "leave" + Constants.MESSAGE_PROP_SEPARATOR + "time"; //leaveBrokerTime
public static final String ARRIVE_TIME = "arrive" + Constants.MESSAGE_PROP_SEPARATOR + "time";
public static final String STORE_TIME = "store" + Constants.MESSAGE_PROP_SEPARATOR + "time";



}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.registerSubBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
if (msg.getHeader().getCommand() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
logger.error("receive message-------------------------------------" + msg.toString());
}
}
Expand Down

0 comments on commit d501fcb

Please sign in to comment.