mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Polish #259
This commit is contained in:
parent
e7b4a08f44
commit
62ad2836a1
@ -27,7 +27,7 @@
|
||||
<aliyun.sdk.version>4.0.1</aliyun.sdk.version>
|
||||
<alicloud.context.version>1.0.5</alicloud.context.version>
|
||||
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
|
||||
<rocketmq.version>4.3.1</rocketmq.version>
|
||||
<rocketmq.starter.version>2.0.2-SNAPSHOT</rocketmq.starter.version>
|
||||
<schedulerX.client.version>2.1.6</schedulerX.client.version>
|
||||
<dubbo.version>2.6.5</dubbo.version>
|
||||
<dubbo-spring-boot.version>0.2.1.RELEASE</dubbo-spring-boot.version>
|
||||
@ -108,8 +108,8 @@
|
||||
<!-- Apache RocketMQ -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
<version>${rocketmq.version}</version>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>${rocketmq.starter.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Sentinel -->
|
||||
|
@ -21,51 +21,44 @@
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>4.0.3</version>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-actuator</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.micrometer</groupId>
|
||||
<artifactId>micrometer-core</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
|
@ -21,43 +21,16 @@ package org.springframework.cloud.stream.binder.rocketmq;
|
||||
*/
|
||||
public interface RocketMQBinderConstants {
|
||||
|
||||
String ENDPOINT_ID = "rocketmq-binder";
|
||||
|
||||
/**
|
||||
* Header key
|
||||
*/
|
||||
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
|
||||
|
||||
String ROCKET_FLAG = "ROCKETMQ_FLAG";
|
||||
|
||||
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
||||
|
||||
String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG";
|
||||
|
||||
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
||||
String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
|
||||
|
||||
/**
|
||||
* Instrumentation
|
||||
* Default value
|
||||
*/
|
||||
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
|
||||
String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
|
||||
|
||||
interface Metrics {
|
||||
interface Producer {
|
||||
String PREFIX = "scs-rocketmq.producer.";
|
||||
String TOTAL_SENT = "totalSent";
|
||||
String TOTAL_SENT_FAILURES = "totalSentFailures";
|
||||
String SENT_PER_SECOND = "sentPerSecond";
|
||||
String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
|
||||
}
|
||||
|
||||
interface Consumer {
|
||||
String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
|
||||
String PREFIX = "scs-rocketmq.consumer.";
|
||||
String TOTAL_CONSUMED = "totalConsumed";
|
||||
String CONSUMED_PER_SECOND = "consumedPerSecond";
|
||||
String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
|
||||
String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
|
||||
}
|
||||
}
|
||||
String DEFAULT_GROUP = "rocketmq_binder_default_group_name";
|
||||
|
||||
}
|
||||
|
@ -16,16 +16,18 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
@ -39,10 +41,10 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageChannelBinder extends
|
||||
@ -50,21 +52,19 @@ public class RocketMQMessageChannelBinder extends
|
||||
implements
|
||||
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(RocketMQMessageChannelBinder.class);
|
||||
|
||||
private final RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
private final ConsumersManager consumersManager;
|
||||
|
||||
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
|
||||
private Set<String> clientConfigId = new HashSet<>();
|
||||
private Map<String, String> topicInUse = new HashMap<>();
|
||||
|
||||
public RocketMQMessageChannelBinder(
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
super(null, provisioningProvider);
|
||||
this.consumersManager = consumersManager;
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
@ -75,22 +75,53 @@ public class RocketMQMessageChannelBinder extends
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
MessageChannel errorChannel) throws Exception {
|
||||
if (producerProperties.getExtension().getEnabled()) {
|
||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||
destination.getName(), producerProperties,
|
||||
rocketBinderConfigurationProperties, instrumentationManager);
|
||||
|
||||
RocketMQTemplate rocketMQTemplate;
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
// transaction message check LocalTransactionExecuter
|
||||
messageHandler.setLocalTransactionExecuter(
|
||||
getClassConfiguration(destination.getName(),
|
||||
producerProperties.getExtension().getExecuter(),
|
||||
LocalTransactionExecuter.class));
|
||||
// transaction message check TransactionCheckListener
|
||||
messageHandler.setTransactionCheckListener(
|
||||
getClassConfiguration(destination.getName(),
|
||||
producerProperties.getExtension()
|
||||
.getTransactionCheckListener(),
|
||||
TransactionCheckListener.class));
|
||||
Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
|
||||
.getBeansOfType(RocketMQTemplate.class);
|
||||
if (rocketMQTemplates.size() == 0) {
|
||||
throw new IllegalStateException(
|
||||
"there is no RocketMQTemplate in Spring BeanFactory");
|
||||
}
|
||||
else if (rocketMQTemplates.size() > 1) {
|
||||
throw new IllegalStateException(
|
||||
"there is more than 1 RocketMQTemplates in Spring BeanFactory");
|
||||
}
|
||||
rocketMQTemplate = rocketMQTemplates.values().iterator().next();
|
||||
clientConfigId.add(rocketMQTemplate.getProducer().buildMQClientId());
|
||||
}
|
||||
else {
|
||||
rocketMQTemplate = new RocketMQTemplate();
|
||||
rocketMQTemplate.setObjectMapper(this.getApplicationContext()
|
||||
.getBeansOfType(ObjectMapper.class).values().iterator().next());
|
||||
DefaultMQProducer producer = new DefaultMQProducer(destination.getName());
|
||||
producer.setNamesrvAddr(
|
||||
rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
producer.setSendMsgTimeout(
|
||||
producerProperties.getExtension().getSendMessageTimeout());
|
||||
producer.setRetryTimesWhenSendFailed(
|
||||
producerProperties.getExtension().getRetryTimesWhenSendFailed());
|
||||
producer.setRetryTimesWhenSendAsyncFailed(producerProperties
|
||||
.getExtension().getRetryTimesWhenSendAsyncFailed());
|
||||
producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
|
||||
.getCompressMessageBodyThreshold());
|
||||
producer.setRetryAnotherBrokerWhenNotStoreOK(
|
||||
producerProperties.getExtension().isRetryNextServer());
|
||||
producer.setMaxMessageSize(
|
||||
producerProperties.getExtension().getMaxMessageSize());
|
||||
producer.setVipChannelEnabled(
|
||||
producerProperties.getExtension().getVipChannelEnabled());
|
||||
rocketMQTemplate.setProducer(producer);
|
||||
clientConfigId.add(producer.buildMQClientId());
|
||||
}
|
||||
|
||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||
rocketMQTemplate, destination.getName(),
|
||||
producerProperties.getExtension().getTransactional(),
|
||||
instrumentationManager);
|
||||
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
||||
messageHandler.setSync(producerProperties.getExtension().getSync());
|
||||
|
||||
if (errorChannel != null) {
|
||||
messageHandler.setSendFailureChannel(errorChannel);
|
||||
@ -113,9 +144,22 @@ public class RocketMQMessageChannelBinder extends
|
||||
"'group must be configured for channel " + destination.getName());
|
||||
}
|
||||
|
||||
RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
|
||||
consumerProperties, this);
|
||||
listenerContainer.setConsumerGroup(group);
|
||||
listenerContainer.setTopic(destination.getName());
|
||||
listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
||||
listenerContainer.setSuspendCurrentQueueTimeMillis(
|
||||
consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
|
||||
listenerContainer.setDelayLevelWhenNextConsume(
|
||||
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
||||
listenerContainer
|
||||
.setNameServer(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
|
||||
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
||||
consumersManager, consumerProperties, destination.getName(), group,
|
||||
instrumentationManager);
|
||||
listenerContainer, consumerProperties, instrumentationManager);
|
||||
|
||||
topicInUse.put(destination.getName(), group);
|
||||
|
||||
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
|
||||
group, consumerProperties);
|
||||
@ -143,45 +187,11 @@ public class RocketMQMessageChannelBinder extends
|
||||
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
||||
}
|
||||
|
||||
private <T> T getClassConfiguration(String destName, String className,
|
||||
Class<T> interfaceClass) {
|
||||
if (StringUtils.isEmpty(className)) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, should set "
|
||||
+ interfaceClass.getSimpleName() + " configuration"
|
||||
+ interfaceClass.getSimpleName() + " should be set, like "
|
||||
+ "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour"
|
||||
+ interfaceClass.getSimpleName() + "'");
|
||||
}
|
||||
else if (StringUtils.isNotEmpty(className)) {
|
||||
Class fieldClass;
|
||||
// check class exists
|
||||
try {
|
||||
fieldClass = ClassUtils.forName(className,
|
||||
RocketMQMessageChannelBinder.class.getClassLoader());
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " class is not found");
|
||||
}
|
||||
// check interface incompatible
|
||||
if (!interfaceClass.isAssignableFrom(fieldClass)) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " is incompatible with " + interfaceClass.getSimpleName()
|
||||
+ " interface");
|
||||
}
|
||||
try {
|
||||
return (T) fieldClass.newInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Binding for channel " + destName
|
||||
+ " using transactional message, but " + className
|
||||
+ " instance error", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
public Set<String> getClientConfigId() {
|
||||
return clientConfigId;
|
||||
}
|
||||
|
||||
public Map<String, String> getTopicInUse() {
|
||||
return topicInUse;
|
||||
}
|
||||
}
|
||||
|
@ -1,135 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
|
||||
import org.springframework.integration.support.MutableMessage;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
|
||||
|
||||
public RocketMQMessageHeaderAccessor() {
|
||||
super();
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor(Message<?> message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public Acknowledgement getAcknowledgement(Message message) {
|
||||
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withAcknowledgment(
|
||||
Acknowledgement acknowledgment) {
|
||||
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getTags() {
|
||||
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withTags(String tag) {
|
||||
setHeader(MessageConst.PROPERTY_TAGS, tag);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getKeys() {
|
||||
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withKeys(String keys) {
|
||||
setHeader(MessageConst.PROPERTY_KEYS, keys);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessageExt getRocketMessage() {
|
||||
return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
|
||||
setHeader(ORIGINAL_ROCKET_MESSAGE, message);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getDelayTimeLevel() {
|
||||
return (Integer) getMessageHeaders()
|
||||
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
|
||||
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getFlag() {
|
||||
return (Integer) getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
|
||||
setHeader(ROCKET_FLAG, delayTimeLevel);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Object getTransactionalArg() {
|
||||
return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG);
|
||||
}
|
||||
|
||||
public Object withTransactionalArg(Object arg) {
|
||||
setHeader(ROCKET_TRANSACTIONAL_ARG, arg);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SendResult getSendResult() {
|
||||
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
|
||||
}
|
||||
|
||||
public static void putSendResult(MutableMessage message, SendResult sendResult) {
|
||||
message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
|
||||
}
|
||||
|
||||
public Map<String, String> getUserProperties() {
|
||||
Map<String, String> result = new HashMap<>();
|
||||
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
|
||||
if (entry.getValue() instanceof String
|
||||
&& !MessageConst.STRING_HASH_SET.contains(entry.getKey())
|
||||
&& !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
|
||||
result.put(entry.getKey(), (String) entry.getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.actuator;
|
||||
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Endpoint(id = ENDPOINT_ID)
|
||||
public class RocketMQBinderEndpoint {
|
||||
|
||||
@Autowired(required = false)
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
@ReadOperation
|
||||
public Map<String, Object> invoke() {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
if (instrumentationManager != null) {
|
||||
result.put("metrics",
|
||||
instrumentationManager.getMetricRegistry().getMetrics());
|
||||
result.put("runtime", instrumentationManager.getRuntime());
|
||||
}
|
||||
else {
|
||||
result.put("warning",
|
||||
"please add metrics-core dependency, we use it for metrics");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
@ -28,33 +28,25 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
|
||||
*/
|
||||
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
@Autowired(required = false)
|
||||
@Autowired
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
@Override
|
||||
protected void doHealthCheck(Health.Builder builder) throws Exception {
|
||||
if (instrumentationManager != null) {
|
||||
if (instrumentationManager.getHealthInstrumentations().stream()
|
||||
.allMatch(Instrumentation::isUp)) {
|
||||
builder.up();
|
||||
return;
|
||||
}
|
||||
if (instrumentationManager.getHealthInstrumentations().stream()
|
||||
.allMatch(Instrumentation::isOutOfService)) {
|
||||
builder.outOfService();
|
||||
return;
|
||||
}
|
||||
builder.down();
|
||||
instrumentationManager.getHealthInstrumentations().stream()
|
||||
.filter(instrumentation -> !instrumentation.isStarted())
|
||||
.forEach(instrumentation1 -> builder
|
||||
.withException(instrumentation1.getStartException()));
|
||||
if (instrumentationManager.getHealthInstrumentations().stream()
|
||||
.allMatch(Instrumentation::isUp)) {
|
||||
builder.up();
|
||||
return;
|
||||
}
|
||||
else {
|
||||
builder.down();
|
||||
builder.withDetail("warning",
|
||||
"please add metrics-core dependency, we use it for metrics");
|
||||
if (instrumentationManager.getHealthInstrumentations().stream()
|
||||
.allMatch(Instrumentation::isOutOfService)) {
|
||||
builder.outOfService();
|
||||
return;
|
||||
}
|
||||
|
||||
builder.down();
|
||||
instrumentationManager.getHealthInstrumentations().stream()
|
||||
.filter(instrumentation -> !instrumentation.isStarted())
|
||||
.forEach(instrumentation1 -> builder
|
||||
.withException(instrumentation1.getStartException()));
|
||||
}
|
||||
}
|
||||
|
@ -16,23 +16,30 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.apache.rocketmq.client.log.ClientLogger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.RocketMQBinderMetrics;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration
|
||||
@Import(RocketMQBinderHealthIndicatorAutoConfiguration.class)
|
||||
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
|
||||
RocketMQExtendedBindingProperties.class })
|
||||
public class RocketMQBinderAutoConfiguration {
|
||||
@ -41,17 +48,12 @@ public class RocketMQBinderAutoConfiguration {
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
@Autowired
|
||||
public RocketMQBinderAutoConfiguration(
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
|
||||
this.rocketBinderConfigurationProperties.getLogLevel());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ -62,17 +64,33 @@ public class RocketMQBinderAutoConfiguration {
|
||||
@Bean
|
||||
public RocketMQMessageChannelBinder rocketMessageChannelBinder(
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
ConsumersManager consumersManager) {
|
||||
InstrumentationManager instrumentationManager) {
|
||||
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
|
||||
consumersManager, extendedBindingProperties, provisioningProvider,
|
||||
extendedBindingProperties, provisioningProvider,
|
||||
rocketBinderConfigurationProperties, instrumentationManager);
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumersManager consumersManager() {
|
||||
return new ConsumersManager(instrumentationManager,
|
||||
rocketBinderConfigurationProperties);
|
||||
public InstrumentationManager instrumentationManager() {
|
||||
return new InstrumentationManager();
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(MeterRegistry.class)
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
protected class RocketMQBinderMetricsConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(RocketMQBinderMetrics.class)
|
||||
public MeterBinder rocketMqBinderMetrics(
|
||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder,
|
||||
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
|
||||
MeterRegistry meterRegistry) {
|
||||
return new RocketMQBinderMetrics(rocketMQMessageChannelBinder,
|
||||
rocketMQBinderConfigurationProperties, meterRegistry);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,13 +16,9 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@ -30,24 +26,12 @@ import org.springframework.context.annotation.Configuration;
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(EndpointAutoConfiguration.class)
|
||||
@ConditionalOnClass(Endpoint.class)
|
||||
public class RocketMQBinderEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RocketMQBinderEndpoint rocketBinderEndpoint() {
|
||||
return new RocketMQBinderEndpoint();
|
||||
}
|
||||
public class RocketMQBinderHealthIndicatorAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
|
||||
return new RocketMQBinderHealthIndicator();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnClass(name = "com.codahale.metrics.Counter")
|
||||
public InstrumentationManager instrumentationManager() {
|
||||
return new InstrumentationManager();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer;
|
||||
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
|
||||
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
|
||||
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
|
||||
@ConditionalOnMissingBean(DefaultMQProducer.class)
|
||||
public class RocketMQComponent4BinderAutoConfiguration {
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
public RocketMQComponent4BinderAutoConfiguration(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DefaultMQProducer defaultMQProducer() {
|
||||
RocketMQProperties rocketMQProperties = new RocketMQProperties();
|
||||
String configNameServer = environment
|
||||
.getProperty("spring.cloud.stream.rocketmq.binder.namesrv-addr");
|
||||
if (StringUtils.isEmpty(configNameServer)) {
|
||||
rocketMQProperties.setNameServer(RocketMQBinderConstants.DEFAULT_NAME_SERVER);
|
||||
}
|
||||
else {
|
||||
rocketMQProperties.setNameServer(configNameServer);
|
||||
}
|
||||
RocketMQProperties.Producer producerConfig = new Producer();
|
||||
rocketMQProperties.setProducer(producerConfig);
|
||||
producerConfig.setGroup(RocketMQBinderConstants.DEFAULT_GROUP);
|
||||
|
||||
String nameServer = rocketMQProperties.getNameServer();
|
||||
String groupName = producerConfig.getGroup();
|
||||
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
|
||||
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
|
||||
|
||||
DefaultMQProducer producer = new DefaultMQProducer(groupName);
|
||||
producer.setNamesrvAddr(nameServer);
|
||||
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
|
||||
producer.setRetryTimesWhenSendFailed(
|
||||
producerConfig.getRetryTimesWhenSendFailed());
|
||||
producer.setRetryTimesWhenSendAsyncFailed(
|
||||
producerConfig.getRetryTimesWhenSendAsyncFailed());
|
||||
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
|
||||
producer.setCompressMsgBodyOverHowmuch(
|
||||
producerConfig.getCompressMessageBodyThreshold());
|
||||
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
||||
@Bean(destroyMethod = "destroy")
|
||||
@ConditionalOnBean(DefaultMQProducer.class)
|
||||
@ConditionalOnMissingBean(RocketMQTemplate.class)
|
||||
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
|
||||
ObjectMapper rocketMQMessageObjectMapper) {
|
||||
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
|
||||
rocketMQTemplate.setProducer(mqProducer);
|
||||
rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
|
||||
return rocketMQTemplate;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnBean(RocketMQTemplate.class)
|
||||
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
|
||||
public TransactionHandlerRegistry transactionHandlerRegistry(
|
||||
RocketMQTemplate template) {
|
||||
return new TransactionHandlerRegistry(template);
|
||||
}
|
||||
|
||||
@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||
@ConditionalOnBean(TransactionHandlerRegistry.class)
|
||||
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
|
||||
TransactionHandlerRegistry transactionHandlerRegistry) {
|
||||
return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
|
||||
}
|
||||
|
||||
}
|
@ -1,98 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class Acknowledgement {
|
||||
|
||||
/**
|
||||
* for {@link ConsumeConcurrentlyContext} using
|
||||
*/
|
||||
private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
/**
|
||||
* Message consume retry strategy<br>
|
||||
* -1,no retry,put into DLQ directly<br>
|
||||
* 0,broker control retry frequency<br>
|
||||
* >0,client control retry frequency
|
||||
*/
|
||||
private Integer consumeConcurrentlyDelayLevel = 0;
|
||||
|
||||
/**
|
||||
* for {@link ConsumeOrderlyContext} using
|
||||
*/
|
||||
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
|
||||
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
|
||||
|
||||
public Acknowledgement setConsumeConcurrentlyStatus(
|
||||
ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
|
||||
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
|
||||
return consumeConcurrentlyStatus;
|
||||
}
|
||||
|
||||
public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
|
||||
return consumeOrderlyStatus;
|
||||
}
|
||||
|
||||
public Acknowledgement setConsumeOrderlyStatus(
|
||||
ConsumeOrderlyStatus consumeOrderlyStatus) {
|
||||
this.consumeOrderlyStatus = consumeOrderlyStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getConsumeConcurrentlyDelayLevel() {
|
||||
return consumeConcurrentlyDelayLevel;
|
||||
}
|
||||
|
||||
public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
|
||||
this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
|
||||
}
|
||||
|
||||
public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
|
||||
return consumeOrderlySuspendCurrentQueueTimeMill;
|
||||
}
|
||||
|
||||
public void setConsumeOrderlySuspendCurrentQueueTimeMill(
|
||||
Long consumeOrderlySuspendCurrentQueueTimeMill) {
|
||||
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
|
||||
}
|
||||
|
||||
public static Acknowledgement buildOrderlyInstance() {
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
|
||||
return acknowledgement;
|
||||
}
|
||||
|
||||
public static Acknowledgement buildConcurrentlyInstance() {
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
acknowledgement
|
||||
.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
|
||||
return acknowledgement;
|
||||
}
|
||||
|
||||
}
|
@ -1,137 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumersManager {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
|
||||
private final Map<String, Boolean> started = new HashMap<>();
|
||||
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
public ConsumersManager(InstrumentationManager instrumentationManager,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
}
|
||||
|
||||
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
|
||||
String topic,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
|
||||
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
|
||||
consumerProperties);
|
||||
|
||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||
ConsumerGroupInstrumentation instrumentation = manager
|
||||
.getConsumerGroupInstrumentation(group);
|
||||
instrumentationManager.addHealthInstrumentation(instrumentation);
|
||||
});
|
||||
|
||||
if (consumerGroups.containsKey(group)) {
|
||||
return consumerGroups.get(group);
|
||||
}
|
||||
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
|
||||
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
consumerGroups.put(group, consumer);
|
||||
started.put(group, false);
|
||||
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
||||
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
|
||||
if (consumerProperties.getExtension().getBroadcasting()) {
|
||||
consumer.setMessageModel(MessageModel.BROADCASTING);
|
||||
}
|
||||
logger.info("RocketMQ consuming for SCS group {} created", group);
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public synchronized void startConsumers() throws MQClientException {
|
||||
for (String group : getConsumerGroups()) {
|
||||
start(group);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void startConsumer(String group) throws MQClientException {
|
||||
start(group);
|
||||
}
|
||||
|
||||
public synchronized void stopConsumer(String group) {
|
||||
stop(group);
|
||||
}
|
||||
|
||||
private void stop(String group) {
|
||||
if (consumerGroups.get(group) != null) {
|
||||
consumerGroups.get(group).shutdown();
|
||||
started.put(group, false);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void start(String group) throws MQClientException {
|
||||
if (started.get(group)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ConsumerGroupInstrumentation groupInstrumentation = null;
|
||||
if (Optional.ofNullable(instrumentationManager).isPresent()) {
|
||||
groupInstrumentation = instrumentationManager
|
||||
.getConsumerGroupInstrumentation(group);
|
||||
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
|
||||
}
|
||||
|
||||
try {
|
||||
consumerGroups.get(group).start();
|
||||
started.put(group, true);
|
||||
Optional.ofNullable(groupInstrumentation)
|
||||
.ifPresent(g -> g.markStartedSuccessfully());
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
Optional.ofNullable(groupInstrumentation)
|
||||
.ifPresent(g -> g.markStartFailed(e));
|
||||
logger.error("RocketMQ Consumer hasn't been started. Caused by "
|
||||
+ e.getErrorMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Set<String> getConsumerGroups() {
|
||||
return consumerGroups.keySet();
|
||||
}
|
||||
}
|
@ -0,0 +1,388 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.SelectorType;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
||||
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQListenerBindingContainer
|
||||
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
|
||||
|
||||
private final static Logger log = LoggerFactory
|
||||
.getLogger(RocketMQListenerBindingContainer.class);
|
||||
|
||||
private long suspendCurrentQueueTimeMillis = 1000;
|
||||
|
||||
/**
|
||||
* Message consume retry strategy<br>
|
||||
* -1,no retry,put into DLQ directly<br>
|
||||
* 0,broker control retry frequency<br>
|
||||
* >0,client control retry frequency.
|
||||
*/
|
||||
private int delayLevelWhenNextConsume = 0;
|
||||
|
||||
private String nameServer;
|
||||
|
||||
private String consumerGroup;
|
||||
|
||||
private String topic;
|
||||
|
||||
private int consumeThreadMax = 64;
|
||||
|
||||
private String charset = "UTF-8";
|
||||
|
||||
private RocketMQListener rocketMQListener;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
private boolean running;
|
||||
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
|
||||
|
||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
||||
|
||||
// The following properties came from RocketMQConsumerProperties.
|
||||
private ConsumeMode consumeMode;
|
||||
private SelectorType selectorType;
|
||||
private String selectorExpression;
|
||||
private MessageModel messageModel;
|
||||
|
||||
public RocketMQListenerBindingContainer(
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
|
||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
|
||||
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
|
||||
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
|
||||
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
|
||||
? ConsumeMode.ORDERLY
|
||||
: ConsumeMode.CONCURRENTLY;
|
||||
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
|
||||
this.selectorType = SelectorType.TAG;
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
|
||||
}
|
||||
else {
|
||||
this.selectorType = SelectorType.SQL92;
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
|
||||
}
|
||||
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
|
||||
? MessageModel.BROADCASTING
|
||||
: MessageModel.CLUSTERING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
|
||||
this.rocketMQListener = rocketMQListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
this.setRunning(false);
|
||||
if (Objects.nonNull(consumer)) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
log.info("container destroyed, {}", this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
initRocketMQPushConsumer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (this.isRunning()) {
|
||||
throw new IllegalStateException(
|
||||
"container already running. " + this.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
consumer.start();
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
|
||||
}
|
||||
this.setRunning(true);
|
||||
|
||||
log.info("running container: {}", this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (this.isRunning()) {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
setRunning(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private void setRunning(boolean running) {
|
||||
this.running = running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
private void initRocketMQPushConsumer() throws MQClientException {
|
||||
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
|
||||
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
|
||||
Assert.notNull(nameServer, "Property 'nameServer' is required");
|
||||
Assert.notNull(topic, "Property 'topic' is required");
|
||||
|
||||
consumer = new DefaultMQPushConsumer(consumerGroup);
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
|
||||
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
|
||||
|
||||
switch (messageModel) {
|
||||
case BROADCASTING:
|
||||
consumer.setMessageModel(
|
||||
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
|
||||
break;
|
||||
case CLUSTERING:
|
||||
consumer.setMessageModel(
|
||||
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
|
||||
}
|
||||
|
||||
switch (selectorType) {
|
||||
case TAG:
|
||||
consumer.subscribe(topic, selectorExpression);
|
||||
break;
|
||||
case SQL92:
|
||||
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
|
||||
}
|
||||
|
||||
switch (consumeMode) {
|
||||
case ORDERLY:
|
||||
consumer.setMessageListener(new DefaultMessageListenerOrderly());
|
||||
break;
|
||||
case CONCURRENTLY:
|
||||
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
|
||||
}
|
||||
|
||||
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
|
||||
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
|
||||
.prepareStart(consumer);
|
||||
}
|
||||
|
||||
rocketMQMessageChannelBinder.getClientConfigId().add(consumer.buildMQClientId());
|
||||
|
||||
}
|
||||
|
||||
public long getSuspendCurrentQueueTimeMillis() {
|
||||
return suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
|
||||
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public int getDelayLevelWhenNextConsume() {
|
||||
return delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
|
||||
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public String getNameServer() {
|
||||
return nameServer;
|
||||
}
|
||||
|
||||
public void setNameServer(String nameServer) {
|
||||
this.nameServer = nameServer;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public void setConsumerGroup(String consumerGroup) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public int getConsumeThreadMax() {
|
||||
return consumeThreadMax;
|
||||
}
|
||||
|
||||
public void setConsumeThreadMax(int consumeThreadMax) {
|
||||
this.consumeThreadMax = consumeThreadMax;
|
||||
}
|
||||
|
||||
public String getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public void setCharset(String charset) {
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
public RocketMQListener getRocketMQListener() {
|
||||
return rocketMQListener;
|
||||
}
|
||||
|
||||
public void setRocketMQListener(RocketMQListener rocketMQListener) {
|
||||
this.rocketMQListener = rocketMQListener;
|
||||
}
|
||||
|
||||
public DefaultMQPushConsumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public void setConsumer(DefaultMQPushConsumer consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
|
||||
return rocketMQConsumerProperties;
|
||||
}
|
||||
|
||||
public ConsumeMode getConsumeMode() {
|
||||
return consumeMode;
|
||||
}
|
||||
|
||||
public SelectorType getSelectorType() {
|
||||
return selectorType;
|
||||
}
|
||||
|
||||
public String getSelectorExpression() {
|
||||
return selectorExpression;
|
||||
}
|
||||
|
||||
public MessageModel getMessageModel() {
|
||||
return messageModel;
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerConcurrently
|
||||
implements MessageListenerConcurrently {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener
|
||||
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeOrderlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener
|
||||
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
context.setSuspendCurrentQueueTimeMillis(
|
||||
suspendCurrentQueueTimeMillis);
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,47 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.exception;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
/**
|
||||
* An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
|
||||
*
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
* @since 0.2.2
|
||||
*/
|
||||
public class RocketMQSendFailureException extends MessagingException {
|
||||
|
||||
private final org.apache.rocketmq.common.message.Message rocketmqMsg;
|
||||
|
||||
public RocketMQSendFailureException(Message<?> message,
|
||||
org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
|
||||
super(message, cause);
|
||||
this.rocketmqMsg = rocketmqMsg;
|
||||
}
|
||||
|
||||
public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
|
||||
return rocketmqMsg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
|
||||
}
|
||||
|
||||
}
|
@ -16,45 +16,24 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import org.apache.commons.lang3.ClassUtils;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListener;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.retry.RecoveryCallback;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
@ -64,34 +43,22 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(RocketMQInboundChannelAdapter.class);
|
||||
|
||||
private ConsumerInstrumentation consumerInstrumentation;
|
||||
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
private RetryTemplate retryTemplate;
|
||||
|
||||
private RecoveryCallback<? extends Object> recoveryCallback;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
private CloudStreamMessageListener listener;
|
||||
private RocketMQListenerBindingContainer rocketMQListenerContainer;
|
||||
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
|
||||
|
||||
private final String destination;
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
private final String group;
|
||||
|
||||
private final ConsumersManager consumersManager;
|
||||
|
||||
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
|
||||
public RocketMQInboundChannelAdapter(
|
||||
RocketMQListenerBindingContainer rocketMQListenerContainer,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
|
||||
String destination, String group,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
this.consumersManager = consumersManager;
|
||||
this.rocketMQListenerContainer = rocketMQListenerContainer;
|
||||
this.consumerProperties = consumerProperties;
|
||||
this.destination = destination;
|
||||
this.group = group;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@ -108,16 +75,27 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
|
||||
+ "send an error message when retries are exhausted");
|
||||
}
|
||||
this.consumer = consumersManager.getOrCreateConsumer(group, destination,
|
||||
consumerProperties);
|
||||
|
||||
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
|
||||
this.listener = isOrderly ? new CloudStreamMessageListenerOrderly()
|
||||
: new CloudStreamMessageListenerConcurrently();
|
||||
BindingRocketMQListener listener = new BindingRocketMQListener();
|
||||
rocketMQListenerContainer.setRocketMQListener(listener);
|
||||
|
||||
if (retryTemplate != null) {
|
||||
this.retryTemplate.registerListener(this.listener);
|
||||
this.retryTemplate.registerListener(listener);
|
||||
}
|
||||
|
||||
try {
|
||||
rocketMQListenerContainer.afterPropertiesSet();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
|
||||
throw new IllegalArgumentException(
|
||||
"rocketMQListenerContainer init error: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
instrumentationManager.addHealthInstrumentation(
|
||||
new Instrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -126,53 +104,28 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
|| !consumerProperties.getExtension().getEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String tags = consumerProperties.getExtension().getTags();
|
||||
|
||||
Set<String> tagsSet = tags == null ? new HashSet<>()
|
||||
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||
consumerInstrumentation = manager.getConsumerInstrumentation(destination);
|
||||
manager.addHealthInstrumentation(consumerInstrumentation);
|
||||
});
|
||||
|
||||
try {
|
||||
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
||||
this.consumer.subscribe(destination, MessageSelector
|
||||
.bySql(consumerProperties.getExtension().getSql()));
|
||||
}
|
||||
else {
|
||||
this.consumer.subscribe(destination, String.join(" || ", tagsSet));
|
||||
}
|
||||
Optional.ofNullable(consumerInstrumentation)
|
||||
.ifPresent(c -> c.markStartedSuccessfully());
|
||||
rocketMQListenerContainer.start();
|
||||
instrumentationManager
|
||||
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup())
|
||||
.markStartedSuccessfully();
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
Optional.ofNullable(consumerInstrumentation)
|
||||
.ifPresent(c -> c.markStartFailed(e));
|
||||
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
|
||||
+ e.getErrorMessage(), e);
|
||||
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
|
||||
}
|
||||
|
||||
this.consumer.registerMessageListener(this.listener);
|
||||
|
||||
try {
|
||||
consumersManager.startConsumer(group);
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
logger.error(
|
||||
"RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(),
|
||||
e);
|
||||
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
|
||||
catch (Exception e) {
|
||||
instrumentationManager
|
||||
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup())
|
||||
.markStartFailed(e);
|
||||
logger.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
|
||||
throw new MessagingException(MessageBuilder.withPayload(
|
||||
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
|
||||
.build(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
consumersManager.stopConsumer(group);
|
||||
rocketMQListenerContainer.stop();
|
||||
}
|
||||
|
||||
public void setRetryTemplate(RetryTemplate retryTemplate) {
|
||||
@ -183,84 +136,21 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
this.recoveryCallback = recoveryCallback;
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
|
||||
protected class BindingRocketMQListener
|
||||
implements RocketMQListener<Message>, RetryListener {
|
||||
|
||||
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
||||
try {
|
||||
if (enableRetry) {
|
||||
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
|
||||
(RetryCallback<Acknowledgement, Exception>) context -> doSendMsgs(
|
||||
msgs, context),
|
||||
new RecoveryCallback<Acknowledgement>() {
|
||||
@Override
|
||||
public Acknowledgement recover(RetryContext context)
|
||||
throws Exception {
|
||||
RocketMQInboundChannelAdapter.this.recoveryCallback
|
||||
.recover(context);
|
||||
if (ClassUtils.isAssignable(this.getClass(),
|
||||
MessageListenerConcurrently.class)) {
|
||||
return Acknowledgement
|
||||
.buildConcurrentlyInstance();
|
||||
}
|
||||
else {
|
||||
return Acknowledgement.buildOrderlyInstance();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
Acknowledgement result = doSendMsgs(msgs, null);
|
||||
Optional.ofNullable(
|
||||
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
||||
.ifPresent(manager -> {
|
||||
manager.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumed();
|
||||
});
|
||||
return result;
|
||||
}
|
||||
if (enableRetry) {
|
||||
RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(message);
|
||||
return null;
|
||||
}, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error(
|
||||
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
||||
e);
|
||||
Optional.ofNullable(
|
||||
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
||||
.ifPresent(manager -> {
|
||||
manager.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumedFailure();
|
||||
});
|
||||
else {
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(message);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
|
||||
RetryContext context) {
|
||||
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
||||
msgs.forEach(msg -> {
|
||||
String retryInfo = context == null ? ""
|
||||
: "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
|
||||
logger.debug(retryInfo + "consuming msg:\n" + msg);
|
||||
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
Message<byte[]> toChannel = convertMessagingFromRocketMQMsg(msg,
|
||||
acknowledgement);
|
||||
acknowledgements.add(acknowledgement);
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
|
||||
});
|
||||
return acknowledgements.get(0);
|
||||
}
|
||||
|
||||
private Message convertMessagingFromRocketMQMsg(MessageExt msg,
|
||||
Acknowledgement acknowledgement) {
|
||||
return MessageBuilder.withPayload(msg.getBody())
|
||||
.setHeaders(new RocketMQMessageHeaderAccessor()
|
||||
.withAcknowledgment(acknowledgement).withTags(msg.getTags())
|
||||
.withKeys(msg.getKeys()).withFlag(msg.getFlag())
|
||||
.withRocketMessage(msg))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -272,69 +162,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
@Override
|
||||
public <T, E extends Throwable> void close(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
if (throwable != null) {
|
||||
Optional.ofNullable(
|
||||
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
||||
.ifPresent(manager -> {
|
||||
manager.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumedFailure();
|
||||
});
|
||||
}
|
||||
else {
|
||||
Optional.ofNullable(
|
||||
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
||||
.ifPresent(manager -> {
|
||||
manager.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumed();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
}
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListenerConcurrently
|
||||
extends CloudStreamMessageListener implements MessageListenerConcurrently {
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||
if (acknowledgement != null) {
|
||||
context.setDelayLevelWhenNextConsume(
|
||||
acknowledgement.getConsumeConcurrentlyDelayLevel());
|
||||
return acknowledgement.getConsumeConcurrentlyStatus();
|
||||
}
|
||||
else {
|
||||
context.setDelayLevelWhenNextConsume(consumerProperties.getExtension()
|
||||
.getError().getDelayLevelWhenNextConsume());
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
|
||||
implements MessageListenerOrderly {
|
||||
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeOrderlyContext context) {
|
||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||
if (acknowledgement != null) {
|
||||
context.setSuspendCurrentQueueTimeMillis(
|
||||
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
|
||||
return acknowledgement.getConsumeOrderlyStatus();
|
||||
}
|
||||
else {
|
||||
context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension()
|
||||
.getError().getSuspendCurrentQueueTimeMillis());
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,37 +16,28 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.client.producer.SendStatus;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
import org.apache.rocketmq.client.producer.TransactionMQProducer;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||
import org.springframework.integration.support.MutableMessage;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.ErrorMessage;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
@ -55,83 +46,55 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
|
||||
|
||||
private DefaultMQProducer producer;
|
||||
|
||||
private ProducerInstrumentation producerInstrumentation;
|
||||
|
||||
private InstrumentationManager instrumentationManager;
|
||||
|
||||
private LocalTransactionExecuter localTransactionExecuter;
|
||||
|
||||
private TransactionCheckListener transactionCheckListener;
|
||||
|
||||
private MessageChannel sendFailureChannel;
|
||||
|
||||
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private final Boolean transactional;
|
||||
|
||||
private final String destination;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
private boolean sync = false;
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
public RocketMQMessageHandler(String destination,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
|
||||
Boolean transactional, InstrumentationManager instrumentationManager) {
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
this.destination = destination;
|
||||
this.producerProperties = producerProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.transactional = transactional;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
producer = new TransactionMQProducer(destination);
|
||||
if (transactionCheckListener != null) {
|
||||
((TransactionMQProducer) producer)
|
||||
.setTransactionCheckListener(transactionCheckListener);
|
||||
if (!transactional) {
|
||||
instrumentationManager
|
||||
.addHealthInstrumentation(new Instrumentation(destination));
|
||||
try {
|
||||
rocketMQTemplate.afterPropertiesSet();
|
||||
instrumentationManager.getHealthInstrumentation(destination)
|
||||
.markStartedSuccessfully();
|
||||
}
|
||||
catch (Exception e) {
|
||||
instrumentationManager.getHealthInstrumentation(destination)
|
||||
.markStartFailed(e);
|
||||
logger.error(
|
||||
"RocketMQTemplate startup failed, Caused by " + e.getMessage());
|
||||
throw new MessagingException(MessageBuilder.withPayload(
|
||||
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
|
||||
.build(), e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
producer = new DefaultMQProducer(destination);
|
||||
}
|
||||
|
||||
producer.setVipChannelEnabled(
|
||||
producerProperties.getExtension().getVipChannelEnabled());
|
||||
|
||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||
producerInstrumentation = manager.getProducerInstrumentation(destination);
|
||||
manager.addHealthInstrumentation(producerInstrumentation);
|
||||
});
|
||||
|
||||
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
|
||||
if (producerProperties.getExtension().getMaxMessageSize() > 0) {
|
||||
producer.setMaxMessageSize(
|
||||
producerProperties.getExtension().getMaxMessageSize());
|
||||
}
|
||||
|
||||
try {
|
||||
producer.start();
|
||||
Optional.ofNullable(producerInstrumentation)
|
||||
.ifPresent(p -> p.markStartedSuccessfully());
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
Optional.ofNullable(producerInstrumentation)
|
||||
.ifPresent(p -> p.markStartFailed(e));
|
||||
logger.error(
|
||||
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||
throw new MessagingException(e.getMessage(), e);
|
||||
}
|
||||
running = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (producer != null) {
|
||||
producer.shutdown();
|
||||
if (!transactional) {
|
||||
rocketMQTemplate.destroy();
|
||||
}
|
||||
running = false;
|
||||
}
|
||||
@ -144,100 +107,90 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
@Override
|
||||
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
||||
throws Exception {
|
||||
Message toSend = null;
|
||||
try {
|
||||
if (message.getPayload() instanceof byte[]) {
|
||||
toSend = new Message(destination, (byte[]) message.getPayload());
|
||||
StringBuilder topicWithTags = new StringBuilder(destination);
|
||||
String tags = Optional
|
||||
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
|
||||
.toString();
|
||||
if (!StringUtils.isEmpty(tags)) {
|
||||
topicWithTags = topicWithTags.append(":").append(tags);
|
||||
}
|
||||
else if (message.getPayload() instanceof String) {
|
||||
toSend = new Message(destination,
|
||||
((String) message.getPayload()).getBytes());
|
||||
SendResult sendRes = null;
|
||||
if (transactional) {
|
||||
sendRes = rocketMQTemplate.sendMessageInTransaction(destination,
|
||||
topicWithTags.toString(), message, message.getHeaders()
|
||||
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
|
||||
}
|
||||
else {
|
||||
throw new UnsupportedOperationException("Payload class isn't supported: "
|
||||
+ message.getPayload().getClass());
|
||||
}
|
||||
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(
|
||||
message);
|
||||
headerAccessor.setLeaveMutable(true);
|
||||
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
|
||||
toSend.setTags(headerAccessor.getTags());
|
||||
toSend.setKeys(headerAccessor.getKeys());
|
||||
toSend.setFlag(headerAccessor.getFlag());
|
||||
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties()
|
||||
.entrySet()) {
|
||||
toSend.putUserProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
int delayLevel = 0;
|
||||
try {
|
||||
Object delayLevelObj = message.getHeaders()
|
||||
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
|
||||
if (delayLevelObj instanceof Number) {
|
||||
delayLevel = ((Number) delayLevelObj).intValue();
|
||||
}
|
||||
else if (delayLevelObj instanceof String) {
|
||||
delayLevel = Integer.parseInt((String) delayLevelObj);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
if (sync) {
|
||||
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
|
||||
rocketMQTemplate.getProducer().getSendMsgTimeout(),
|
||||
delayLevel);
|
||||
}
|
||||
else {
|
||||
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
|
||||
new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
|
||||
SendResult sendRes;
|
||||
if (producerProperties.getExtension().getTransactional()) {
|
||||
sendRes = producer.sendMessageInTransaction(toSend,
|
||||
localTransactionExecuter, headerAccessor.getTransactionalArg());
|
||||
}
|
||||
else {
|
||||
sendRes = producer.send(toSend);
|
||||
}
|
||||
}
|
||||
|
||||
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
@Override
|
||||
public void onException(Throwable e) {
|
||||
if (getSendFailureChannel() != null) {
|
||||
getSendFailureChannel().send(
|
||||
RocketMQMessageHandler.this.errorMessageStrategy
|
||||
.buildErrorMessage(
|
||||
new MessagingException(
|
||||
message, e),
|
||||
null));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
if (getSendFailureChannel() != null) {
|
||||
this.getSendFailureChannel().send(message);
|
||||
}
|
||||
else {
|
||||
throw new RocketMQSendFailureException(message, toSend,
|
||||
throw new MessagingException(message,
|
||||
new MQClientException("message hasn't been sent", null));
|
||||
}
|
||||
}
|
||||
if (message instanceof MutableMessage) {
|
||||
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
|
||||
sendRes);
|
||||
}
|
||||
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
||||
manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
|
||||
Instant.now().toEpochMilli());
|
||||
});
|
||||
Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent());
|
||||
}
|
||||
catch (MQClientException | RemotingException | MQBrokerException
|
||||
| InterruptedException | UnsupportedOperationException e) {
|
||||
Optional.ofNullable(producerInstrumentation)
|
||||
.ifPresent(p -> p.markSentFailure());
|
||||
catch (Exception e) {
|
||||
logger.error(
|
||||
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||
if (getSendFailureChannel() != null) {
|
||||
getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(
|
||||
new RocketMQSendFailureException(message, toSend, e), null));
|
||||
getSendFailureChannel().send(this.errorMessageStrategy
|
||||
.buildErrorMessage(new MessagingException(message, e), null));
|
||||
}
|
||||
else {
|
||||
throw new RocketMQSendFailureException(message, toSend, e);
|
||||
throw new MessagingException(message, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
|
||||
* {@link DefaultMQProducer#sendMessageInTransaction}.
|
||||
* @param localTransactionExecuter the executer running when produce msg.
|
||||
*/
|
||||
public void setLocalTransactionExecuter(
|
||||
LocalTransactionExecuter localTransactionExecuter) {
|
||||
this.localTransactionExecuter = localTransactionExecuter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
|
||||
* {@link TransactionMQProducer#setTransactionCheckListener}.
|
||||
* @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
|
||||
*/
|
||||
public void setTransactionCheckListener(
|
||||
TransactionCheckListener transactionCheckListener) {
|
||||
this.transactionCheckListener = transactionCheckListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
|
||||
* to this channel with a payload of a {@link RocketMQSendFailureException} with the
|
||||
* failed message and cause.
|
||||
* to this channel with a payload of a {@link MessagingException} with the failed
|
||||
* message and cause.
|
||||
* @param sendFailureChannel the failure channel.
|
||||
* @since 0.2.2
|
||||
*/
|
||||
@ -259,4 +212,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
public MessageChannel getSendFailureChannel() {
|
||||
return sendFailureChannel;
|
||||
}
|
||||
|
||||
public void setSync(boolean sync) {
|
||||
this.sync = sync;
|
||||
}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumerGroupInstrumentation extends Instrumentation {
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
|
||||
super(name);
|
||||
this.metricRegistry = metricRegistry;
|
||||
}
|
||||
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author juven.xuxb
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumerInstrumentation extends Instrumentation {
|
||||
|
||||
private final Counter totalConsumed;
|
||||
private final Counter totalConsumedFailures;
|
||||
private final Meter consumedPerSecond;
|
||||
private final Meter consumedFailuresPerSecond;
|
||||
|
||||
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
|
||||
super(baseMetricName);
|
||||
|
||||
this.totalConsumed = registry
|
||||
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
|
||||
this.consumedPerSecond = registry
|
||||
.meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
|
||||
this.totalConsumedFailures = registry
|
||||
.counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
|
||||
this.consumedFailuresPerSecond = registry
|
||||
.meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
|
||||
}
|
||||
|
||||
public void markConsumed() {
|
||||
totalConsumed.inc();
|
||||
consumedPerSecond.mark();
|
||||
}
|
||||
|
||||
public void markConsumedFailure() {
|
||||
totalConsumedFailures.inc();
|
||||
consumedFailuresPerSecond.mark();
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ public class Instrumentation {
|
||||
protected final AtomicBoolean started = new AtomicBoolean(false);
|
||||
protected Exception startException = null;
|
||||
|
||||
Instrumentation(String name) {
|
||||
public Instrumentation(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
|
@ -22,47 +22,16 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class InstrumentationManager {
|
||||
|
||||
private final MetricRegistry metricRegistry = new MetricRegistry();
|
||||
private final Map<String, Object> runtime = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
|
||||
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
|
||||
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
|
||||
|
||||
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
|
||||
|
||||
public ProducerInstrumentation getProducerInstrumentation(String destination) {
|
||||
String key = Producer.PREFIX + destination;
|
||||
producerInstrumentations.putIfAbsent(key,
|
||||
new ProducerInstrumentation(metricRegistry, key));
|
||||
return producerInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
|
||||
String key = Consumer.PREFIX + destination;
|
||||
consumeInstrumentations.putIfAbsent(key,
|
||||
new ConsumerInstrumentation(metricRegistry, key));
|
||||
return consumeInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
|
||||
String key = Consumer.GROUP_PREFIX + group;
|
||||
consumerGroupsInstrumentations.putIfAbsent(key,
|
||||
new ConsumerGroupInstrumentation(metricRegistry, key));
|
||||
return consumerGroupsInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public Set<Instrumentation> getHealthInstrumentations() {
|
||||
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
|
||||
.collect(Collectors.toSet());
|
||||
@ -72,11 +41,12 @@ public class InstrumentationManager {
|
||||
healthInstrumentations.put(instrumentation.getName(), instrumentation);
|
||||
}
|
||||
|
||||
public Instrumentation getHealthInstrumentation(String key) {
|
||||
return healthInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public Map<String, Object> getRuntime() {
|
||||
return runtime;
|
||||
}
|
||||
|
||||
public MetricRegistry getMetricRegistry() {
|
||||
return metricRegistry;
|
||||
}
|
||||
}
|
||||
|
@ -1,59 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author juven.xuxb
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ProducerInstrumentation extends Instrumentation {
|
||||
|
||||
private final Counter totalSent;
|
||||
private final Counter totalSentFailures;
|
||||
private final Meter sentPerSecond;
|
||||
private final Meter sentFailuresPerSecond;
|
||||
|
||||
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
|
||||
super(baseMetricName);
|
||||
|
||||
this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
|
||||
this.totalSentFailures = registry
|
||||
.counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
|
||||
this.sentPerSecond = registry
|
||||
.meter(name(baseMetricName, Producer.SENT_PER_SECOND));
|
||||
this.sentFailuresPerSecond = registry
|
||||
.meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
|
||||
}
|
||||
|
||||
public void markSent() {
|
||||
totalSent.inc();
|
||||
sentPerSecond.mark();
|
||||
}
|
||||
|
||||
public void markSentFailure() {
|
||||
totalSentFailures.inc();
|
||||
sentFailuresPerSecond.mark();
|
||||
}
|
||||
}
|
@ -0,0 +1,102 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.rocketmq.client.ClientConfig;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.impl.MQClientManager;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
|
||||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import io.micrometer.core.lang.NonNull;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBinderMetrics
|
||||
implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
||||
|
||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
||||
private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
static final String METRIC_NAME = "spring.cloud.stream.binder.rocketmq";
|
||||
|
||||
public RocketMQBinderMetrics(
|
||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder,
|
||||
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
|
||||
MeterRegistry meterRegistry) {
|
||||
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
|
||||
this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
|
||||
this.meterRegistry = meterRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindTo(@NonNull MeterRegistry registry) {
|
||||
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
|
||||
pushConsumer
|
||||
.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNamesrvAddr());
|
||||
DefaultMQProducer producer = new DefaultMQProducer();
|
||||
producer.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNamesrvAddr());
|
||||
|
||||
rocketMQMessageChannelBinder.getTopicInUse().forEach((topic, group) -> {
|
||||
Gauge.builder(METRIC_NAME, this, o -> calculateMsgQueueOffset(topic, group))
|
||||
.tag("group", group).tag("topic", topic)
|
||||
.description("RocketMQ all messageQueue size").register(registry);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private double calculateMsgQueueOffset(String topic, String group) {
|
||||
for (String clientConfigId : this.rocketMQMessageChannelBinder
|
||||
.getClientConfigId()) {
|
||||
ClientConfig clientConfig = new ClientConfig();
|
||||
String[] clientConfigArr = clientConfigId.split("@", 3);
|
||||
clientConfig.setClientIP(clientConfigArr[0]);
|
||||
clientConfig.setInstanceName(clientConfigArr[1]);
|
||||
if (clientConfigArr.length > 2) {
|
||||
clientConfig.setUnitName(clientConfigArr[2]);
|
||||
}
|
||||
Map<MessageQueue, Long> queueLongMap = MQClientManager.getInstance()
|
||||
.getAndCreateMQClientInstance(clientConfig)
|
||||
.getConsumerStatus(topic, group);
|
||||
if (queueLongMap.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
return queueLongMap.values().stream()
|
||||
.collect(Collectors.summingLong(Long::longValue));
|
||||
}
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(BindingCreatedEvent event) {
|
||||
if (this.meterRegistry != null) {
|
||||
this.bindTo(this.meterRegistry);
|
||||
}
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
@ -25,9 +26,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||
public class RocketMQBinderConfigurationProperties {
|
||||
|
||||
private String namesrvAddr = "127.0.0.1:9876";
|
||||
|
||||
private String logLevel = "ERROR";
|
||||
private String namesrvAddr = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
|
||||
|
||||
public String getNamesrvAddr() {
|
||||
return namesrvAddr;
|
||||
@ -37,12 +36,4 @@ public class RocketMQBinderConfigurationProperties {
|
||||
this.namesrvAddr = namesrvAddr;
|
||||
}
|
||||
|
||||
public String getLogLevel() {
|
||||
return logLevel;
|
||||
}
|
||||
|
||||
public void setLogLevel(String logLevel) {
|
||||
this.logLevel = logLevel;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -50,42 +50,18 @@ public class RocketMQConsumerProperties {
|
||||
*/
|
||||
private Boolean orderly = false;
|
||||
|
||||
/**
|
||||
* for concurrently listener. message consume retry strategy
|
||||
*/
|
||||
private int delayLevelWhenNextConsume = 0;
|
||||
|
||||
/**
|
||||
* for orderly listener. next retry delay time
|
||||
*/
|
||||
private long suspendCurrentQueueTimeMillis = 1000;
|
||||
|
||||
private Boolean enabled = true;
|
||||
|
||||
private Error error;
|
||||
|
||||
public static class Error {
|
||||
|
||||
/**
|
||||
* Reconsume later timeMillis in ConsumeOrderlyContext.
|
||||
*/
|
||||
private Long suspendCurrentQueueTimeMillis = 1000L;
|
||||
|
||||
/**
|
||||
* Message consume retry strategy in ConsumeConcurrentlyContext.
|
||||
*
|
||||
* -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client
|
||||
* control retry frequency
|
||||
*/
|
||||
private Integer delayLevelWhenNextConsume = 0;
|
||||
|
||||
public Long getSuspendCurrentQueueTimeMillis() {
|
||||
return suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) {
|
||||
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public Integer getDelayLevelWhenNextConsume() {
|
||||
return delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) {
|
||||
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
|
||||
}
|
||||
}
|
||||
|
||||
public String getTags() {
|
||||
return tags;
|
||||
}
|
||||
@ -126,11 +102,19 @@ public class RocketMQConsumerProperties {
|
||||
this.broadcasting = broadcasting;
|
||||
}
|
||||
|
||||
public Error getError() {
|
||||
return error;
|
||||
public int getDelayLevelWhenNextConsume() {
|
||||
return delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public void setError(Error error) {
|
||||
this.error = error;
|
||||
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
|
||||
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public long getSuspendCurrentQueueTimeMillis() {
|
||||
return suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
|
||||
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,6 @@
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.TransactionCheckListener;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
@ -31,22 +29,47 @@ public class RocketMQProducerProperties {
|
||||
/**
|
||||
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}
|
||||
*/
|
||||
private Integer maxMessageSize = 0;
|
||||
private Integer maxMessageSize = 1024 * 1024 * 4;
|
||||
|
||||
private Boolean transactional = false;
|
||||
|
||||
/**
|
||||
* full class name of {@link LocalTransactionExecuter}
|
||||
*/
|
||||
private String executer;
|
||||
|
||||
/**
|
||||
* full class name of {@link TransactionCheckListener}
|
||||
*/
|
||||
private String transactionCheckListener;
|
||||
private Boolean sync = false;
|
||||
|
||||
private Boolean vipChannelEnabled = true;
|
||||
|
||||
/**
|
||||
* Millis of send message timeout.
|
||||
*/
|
||||
private int sendMessageTimeout = 3000;
|
||||
|
||||
/**
|
||||
* Compress message body threshold, namely, message body larger than 4k will be
|
||||
* compressed on default.
|
||||
*/
|
||||
private int compressMessageBodyThreshold = 1024 * 4;
|
||||
|
||||
/**
|
||||
* Maximum number of retry to perform internally before claiming sending failure in
|
||||
* synchronous mode. This may potentially cause message duplication which is up to
|
||||
* application developers to resolve.
|
||||
*/
|
||||
private int retryTimesWhenSendFailed = 2;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Maximum number of retry to perform internally before claiming sending failure in
|
||||
* asynchronous mode.
|
||||
* </p>
|
||||
* This may potentially cause message duplication which is up to application
|
||||
* developers to resolve.
|
||||
*/
|
||||
private int retryTimesWhenSendAsyncFailed = 2;
|
||||
|
||||
/**
|
||||
* Indicate whether to retry another broker on sending failure internally.
|
||||
*/
|
||||
private boolean retryNextServer = false;
|
||||
|
||||
public Boolean getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
@ -71,20 +94,12 @@ public class RocketMQProducerProperties {
|
||||
this.transactional = transactional;
|
||||
}
|
||||
|
||||
public String getExecuter() {
|
||||
return executer;
|
||||
public Boolean getSync() {
|
||||
return sync;
|
||||
}
|
||||
|
||||
public void setExecuter(String executer) {
|
||||
this.executer = executer;
|
||||
}
|
||||
|
||||
public String getTransactionCheckListener() {
|
||||
return transactionCheckListener;
|
||||
}
|
||||
|
||||
public void setTransactionCheckListener(String transactionCheckListener) {
|
||||
this.transactionCheckListener = transactionCheckListener;
|
||||
public void setSync(Boolean sync) {
|
||||
this.sync = sync;
|
||||
}
|
||||
|
||||
public Boolean getVipChannelEnabled() {
|
||||
@ -94,4 +109,45 @@ public class RocketMQProducerProperties {
|
||||
public void setVipChannelEnabled(Boolean vipChannelEnabled) {
|
||||
this.vipChannelEnabled = vipChannelEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
public int getSendMessageTimeout() {
|
||||
return sendMessageTimeout;
|
||||
}
|
||||
|
||||
public void setSendMessageTimeout(int sendMessageTimeout) {
|
||||
this.sendMessageTimeout = sendMessageTimeout;
|
||||
}
|
||||
|
||||
public int getCompressMessageBodyThreshold() {
|
||||
return compressMessageBodyThreshold;
|
||||
}
|
||||
|
||||
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
|
||||
this.compressMessageBodyThreshold = compressMessageBodyThreshold;
|
||||
}
|
||||
|
||||
public int getRetryTimesWhenSendFailed() {
|
||||
return retryTimesWhenSendFailed;
|
||||
}
|
||||
|
||||
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
|
||||
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
|
||||
}
|
||||
|
||||
public int getRetryTimesWhenSendAsyncFailed() {
|
||||
return retryTimesWhenSendAsyncFailed;
|
||||
}
|
||||
|
||||
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
|
||||
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
|
||||
}
|
||||
|
||||
public boolean isRetryNextServer() {
|
||||
return retryNextServer;
|
||||
}
|
||||
|
||||
public void setRetryNextServer(boolean retryNextServer) {
|
||||
this.retryNextServer = retryNextServer;
|
||||
}
|
||||
|
||||
}
|
@ -1,2 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration
|
||||
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
|
||||
|
@ -22,7 +22,6 @@ import org.junit.Test;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
|
||||
@ -33,8 +32,7 @@ public class RocketMQAutoConfigurationTests {
|
||||
|
||||
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
|
||||
.withConfiguration(
|
||||
AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class,
|
||||
RocketMQBinderAutoConfiguration.class))
|
||||
AutoConfigurations.of(RocketMQBinderAutoConfiguration.class))
|
||||
.withPropertyValues(
|
||||
"spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
|
||||
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
|
||||
|
Loading…
x
Reference in New Issue
Block a user