1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

format code use eclipse

This commit is contained in:
xiejiashuai 2019-08-07 16:25:49 +08:00
parent aa4b8790e2
commit a1a215f6e7
2 changed files with 398 additions and 389 deletions

View File

@ -24,21 +24,21 @@ import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
*/ */
public interface RocketMQBinderConstants { public interface RocketMQBinderConstants {
/** /**
* Header key * Header key
*/ */
String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG"; String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
/** /**
* Default value * Default value
*/ */
String DEFAULT_NAME_SERVER = "127.0.0.1:9876"; String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
String DEFAULT_GROUP = PREFIX + "binder_default_group_name"; String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
/** /**
* RocketMQ re-consume times * RocketMQ re-consume times
*/ */
String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES"; String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
} }

View File

@ -16,9 +16,11 @@
package com.alibaba.cloud.stream.binder.rocketmq.consuming; package com.alibaba.cloud.stream.binder.rocketmq.consuming;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@ -44,388 +46,395 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import java.util.Objects; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
/** /**
* A class that Listen on rocketmq message * A class that Listen on rocketmq message
* <p> * <p>
* this class will delegate {@link RocketMQListener} to handle message * this class will delegate {@link RocketMQListener} to handle message
* *
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a> * @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
* @see RocketMQListener * @see RocketMQListener
*/ */
public class RocketMQListenerBindingContainer public class RocketMQListenerBindingContainer
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
private final static Logger log = LoggerFactory private final static Logger log = LoggerFactory
.getLogger(RocketMQListenerBindingContainer.class); .getLogger(RocketMQListenerBindingContainer.class);
private long suspendCurrentQueueTimeMillis = 1000; private long suspendCurrentQueueTimeMillis = 1000;
/** /**
* Message consume retry strategy<br> * Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br> * -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br> * 0,broker control retry frequency<br>
* >0,client control retry frequency. * >0,client control retry frequency.
*/ */
private int delayLevelWhenNextConsume = 0; private int delayLevelWhenNextConsume = 0;
private String nameServer; private String nameServer;
private String consumerGroup; private String consumerGroup;
private String topic; private String topic;
private int consumeThreadMax = 64; private int consumeThreadMax = 64;
private String charset = "UTF-8"; private String charset = "UTF-8";
private RocketMQListener rocketMQListener; private RocketMQListener rocketMQListener;
private DefaultMQPushConsumer consumer; private DefaultMQPushConsumer consumer;
private boolean running; private boolean running;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
// The following properties came from RocketMQConsumerProperties. // The following properties came from RocketMQConsumerProperties.
private ConsumeMode consumeMode; private ConsumeMode consumeMode;
private SelectorType selectorType; private SelectorType selectorType;
private String selectorExpression; private String selectorExpression;
private MessageModel messageModel; private MessageModel messageModel;
public RocketMQListenerBindingContainer( public RocketMQListenerBindingContainer(
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
this.rocketMQConsumerProperties = rocketMQConsumerProperties; this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
? ConsumeMode.ORDERLY ? ConsumeMode.ORDERLY
: ConsumeMode.CONCURRENTLY; : ConsumeMode.CONCURRENTLY;
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) { if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
this.selectorType = SelectorType.TAG; this.selectorType = SelectorType.TAG;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags(); this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
} else { }
this.selectorType = SelectorType.SQL92; else {
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql(); this.selectorType = SelectorType.SQL92;
} this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting() }
? MessageModel.BROADCASTING this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
: MessageModel.CLUSTERING; ? MessageModel.BROADCASTING
} : MessageModel.CLUSTERING;
}
@Override
public void setupMessageListener(RocketMQListener<?> rocketMQListener) { @Override
this.rocketMQListener = rocketMQListener; public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
} this.rocketMQListener = rocketMQListener;
}
@Override
public void destroy() throws Exception { @Override
this.setRunning(false); public void destroy() throws Exception {
if (Objects.nonNull(consumer)) { this.setRunning(false);
consumer.shutdown(); if (Objects.nonNull(consumer)) {
} consumer.shutdown();
log.info("container destroyed, {}", this.toString()); }
} log.info("container destroyed, {}", this.toString());
}
@Override
public void afterPropertiesSet() throws Exception { @Override
initRocketMQPushConsumer(); public void afterPropertiesSet() throws Exception {
} initRocketMQPushConsumer();
}
@Override
public boolean isAutoStartup() { @Override
return true; public boolean isAutoStartup() {
} return true;
}
@Override
public void stop(Runnable callback) { @Override
stop(); public void stop(Runnable callback) {
callback.run(); stop();
} callback.run();
}
@Override
public void start() { @Override
if (this.isRunning()) { public void start() {
throw new IllegalStateException( if (this.isRunning()) {
"container already running. " + this.toString()); throw new IllegalStateException(
} "container already running. " + this.toString());
}
try {
consumer.start(); try {
} catch (MQClientException e) { consumer.start();
throw new IllegalStateException("Failed to start RocketMQ push consumer", e); }
} catch (MQClientException e) {
this.setRunning(true); throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
log.info("running container: {}", this.toString()); this.setRunning(true);
}
log.info("running container: {}", this.toString());
@Override }
public void stop() {
if (this.isRunning()) { @Override
if (Objects.nonNull(consumer)) { public void stop() {
consumer.shutdown(); if (this.isRunning()) {
} if (Objects.nonNull(consumer)) {
setRunning(false); consumer.shutdown();
} }
} setRunning(false);
}
@Override }
public boolean isRunning() {
return running; @Override
} public boolean isRunning() {
return running;
private void setRunning(boolean running) { }
this.running = running;
} private void setRunning(boolean running) {
this.running = running;
@Override }
public int getPhase() {
return Integer.MAX_VALUE; @Override
} public int getPhase() {
return Integer.MAX_VALUE;
private void initRocketMQPushConsumer() throws MQClientException { }
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); private void initRocketMQPushConsumer() throws MQClientException {
Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(topic, "Property 'topic' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
String ak = rocketBinderConfigurationProperties.getAccessKey(); Assert.notNull(topic, "Property 'topic' is required");
String sk = rocketBinderConfigurationProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { String ak = rocketBinderConfigurationProperties.getAccessKey();
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk)); String sk = rocketBinderConfigurationProperties.getSecretKey();
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
new AllocateMessageQueueAveragely(), RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
rocketBinderConfigurationProperties.isEnableMsgTrace(), consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); new AllocateMessageQueueAveragely(),
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, rocketBinderConfigurationProperties.isEnableMsgTrace(),
topic + "|" + UtilAll.getPid())); rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
} else { topic + "|" + UtilAll.getPid()));
consumer = new DefaultMQPushConsumer(consumerGroup, consumer.setVipChannelEnabled(false);
rocketBinderConfigurationProperties.isEnableMsgTrace(), }
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); else {
} consumer = new DefaultMQPushConsumer(consumerGroup,
rocketBinderConfigurationProperties.isEnableMsgTrace(),
consumer.setNamesrvAddr(nameServer); rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); }
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
consumer.setNamesrvAddr(nameServer);
switch (messageModel) { consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
case BROADCASTING: consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); switch (messageModel) {
break; case BROADCASTING:
case CLUSTERING: consumer.setMessageModel(
consumer.setMessageModel( org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break;
break; case CLUSTERING:
default: consumer.setMessageModel(
throw new IllegalArgumentException("Property 'messageModel' was wrong."); org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
} break;
default:
switch (selectorType) { throw new IllegalArgumentException("Property 'messageModel' was wrong.");
case TAG: }
consumer.subscribe(topic, selectorExpression);
break; switch (selectorType) {
case SQL92: case TAG:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); consumer.subscribe(topic, selectorExpression);
break; break;
default: case SQL92:
throw new IllegalArgumentException("Property 'selectorType' was wrong."); consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
} break;
default:
switch (consumeMode) { throw new IllegalArgumentException("Property 'selectorType' was wrong.");
case ORDERLY: }
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break; switch (consumeMode) {
case CONCURRENTLY: case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently()); consumer.setMessageListener(new DefaultMessageListenerOrderly());
break; break;
default: case CONCURRENTLY:
throw new IllegalArgumentException("Property 'consumeMode' was wrong."); consumer.setMessageListener(new DefaultMessageListenerConcurrently());
} break;
default:
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
((RocketMQPushConsumerLifecycleListener) rocketMQListener) }
.prepareStart(consumer);
} if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
} .prepareStart(consumer);
}
@Override
public String toString() { }
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' @Override
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType public String toString() {
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ messageModel + '}'; + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
} + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
public long getSuspendCurrentQueueTimeMillis() { + messageModel + '}';
return suspendCurrentQueueTimeMillis; }
}
public long getSuspendCurrentQueueTimeMillis() {
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { return suspendCurrentQueueTimeMillis;
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; }
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
public int getDelayLevelWhenNextConsume() { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
return delayLevelWhenNextConsume; }
}
public int getDelayLevelWhenNextConsume() {
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { return delayLevelWhenNextConsume;
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; }
}
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
public String getNameServer() { this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
return nameServer; }
}
public String getNameServer() {
public void setNameServer(String nameServer) { return nameServer;
this.nameServer = nameServer; }
}
public void setNameServer(String nameServer) {
public String getConsumerGroup() { this.nameServer = nameServer;
return consumerGroup; }
}
public String getConsumerGroup() {
public void setConsumerGroup(String consumerGroup) { return consumerGroup;
this.consumerGroup = consumerGroup; }
}
public void setConsumerGroup(String consumerGroup) {
public String getTopic() { this.consumerGroup = consumerGroup;
return topic; }
}
public String getTopic() {
public void setTopic(String topic) { return topic;
this.topic = topic; }
}
public void setTopic(String topic) {
public int getConsumeThreadMax() { this.topic = topic;
return consumeThreadMax; }
}
public int getConsumeThreadMax() {
public void setConsumeThreadMax(int consumeThreadMax) { return consumeThreadMax;
this.consumeThreadMax = consumeThreadMax; }
}
public void setConsumeThreadMax(int consumeThreadMax) {
public String getCharset() { this.consumeThreadMax = consumeThreadMax;
return charset; }
}
public String getCharset() {
public void setCharset(String charset) { return charset;
this.charset = charset; }
}
public void setCharset(String charset) {
public RocketMQListener getRocketMQListener() { this.charset = charset;
return rocketMQListener; }
}
public RocketMQListener getRocketMQListener() {
public void setRocketMQListener(RocketMQListener rocketMQListener) { return rocketMQListener;
this.rocketMQListener = rocketMQListener; }
}
public void setRocketMQListener(RocketMQListener rocketMQListener) {
public DefaultMQPushConsumer getConsumer() { this.rocketMQListener = rocketMQListener;
return consumer; }
}
public DefaultMQPushConsumer getConsumer() {
public void setConsumer(DefaultMQPushConsumer consumer) { return consumer;
this.consumer = consumer; }
}
public void setConsumer(DefaultMQPushConsumer consumer) {
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() { this.consumer = consumer;
return rocketMQConsumerProperties; }
}
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
public ConsumeMode getConsumeMode() { return rocketMQConsumerProperties;
return consumeMode; }
}
public ConsumeMode getConsumeMode() {
public SelectorType getSelectorType() { return consumeMode;
return selectorType; }
}
public SelectorType getSelectorType() {
public String getSelectorExpression() { return selectorType;
return selectorExpression; }
}
public String getSelectorExpression() {
public MessageModel getMessageModel() { return selectorExpression;
return messageModel; }
}
public MessageModel getMessageModel() {
public class DefaultMessageListenerConcurrently return messageModel;
implements MessageListenerConcurrently { }
@SuppressWarnings({"unchecked", "Duplicates"}) public class DefaultMessageListenerConcurrently
@Override implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { @SuppressWarnings({ "unchecked", "Duplicates" })
for (MessageExt messageExt : msgs) { @Override
log.debug("received msg: {}", messageExt); public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
try { ConsumeConcurrentlyContext context) {
long now = System.currentTimeMillis(); for (MessageExt messageExt : msgs) {
preOnMessage(messageExt); log.debug("received msg: {}", messageExt);
rocketMQListener try {
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); long now = System.currentTimeMillis();
long costTime = System.currentTimeMillis() - now; preOnMessage(messageExt);
log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime); rocketMQListener
} catch (Exception e) { .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
log.warn("consume message failed. messageExt:{}", messageExt, e); long costTime = System.currentTimeMillis() - now;
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); log.debug("consume {} message key:[{}] cost: {} ms",
return ConsumeConcurrentlyStatus.RECONSUME_LATER; messageExt.getMsgId(), messageExt.getKeys(), costTime);
} }
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
} return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} }
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
@SuppressWarnings({"unchecked", "Duplicates"}) }
@Override }
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) { public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt); @SuppressWarnings({ "unchecked", "Duplicates" })
try { @Override
long now = System.currentTimeMillis(); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
preOnMessage(messageExt); ConsumeOrderlyContext context) {
rocketMQListener for (MessageExt messageExt : msgs) {
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); log.debug("received msg: {}", messageExt);
long costTime = System.currentTimeMillis() - now; try {
log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime); long now = System.currentTimeMillis();
} catch (Exception e) { preOnMessage(messageExt);
log.warn("consume message failed. messageExt:{}", messageExt, e); rocketMQListener
context.setSuspendCurrentQueueTimeMillis( .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
suspendCurrentQueueTimeMillis); long costTime = System.currentTimeMillis() - now;
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; log.debug("consume {} message key:[{}] cost: {} ms",
} messageExt.getMsgId(), messageExt.getKeys(), costTime);
} }
catch (Exception e) {
return ConsumeOrderlyStatus.SUCCESS; log.warn("consume message failed. messageExt:{}", messageExt, e);
} context.setSuspendCurrentQueueTimeMillis(
} suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
/** }
* pre handle message before the consumer handle message }
*
* @param messageExt the rocketmq message return ConsumeOrderlyStatus.SUCCESS;
*/ }
private void preOnMessage(MessageExt messageExt) { }
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); /**
} * pre handle message before the consumer handle message
*
* @param messageExt the rocketmq message
*/
private void preOnMessage(MessageExt messageExt) {
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes));
}
} }