1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Merge pull request #1 from spring-cloud-incubator/master

同步一次代码
This commit is contained in:
pbting 2019-01-14 10:09:08 +08:00 committed by GitHub
commit b6d1911008
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 233 additions and 35 deletions

View File

@ -92,6 +92,9 @@ public class RocketMQMessageChannelBinder extends
TransactionCheckListener.class));
}
if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel);
}
return messageHandler;
}
else {
@ -107,7 +110,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
"'group must be configured for channel + " + destination.getName());
"'group must be configured for channel " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(

View File

@ -0,0 +1,47 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.exception;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @since 0.2.2
*/
public class RocketMQSendFailureException extends MessagingException {
private final org.apache.rocketmq.common.message.Message rocketmqMsg;
public RocketMQSendFailureException(Message<?> message,
org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
super(message, cause);
this.rocketmqMsg = rocketmqMsg;
}
public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
return rocketmqMsg;
}
@Override
public String toString() {
return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
}
}

View File

@ -53,6 +53,7 @@ import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
@ -71,6 +72,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private RecoveryCallback<? extends Object> recoveryCallback;
private DefaultMQPushConsumer consumer;
private CloudStreamMessageListener listener;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination;
@ -90,6 +95,31 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
this.instrumentationManager = instrumentationManager;
}
@Override
protected void onInit() {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
super.onInit();
if (this.retryTemplate != null) {
Assert.state(getErrorChannel() == null,
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
}
this.consumer = consumersManager.getOrCreateConsumer(group, destination,
consumerProperties);
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
this.listener = isOrderly ? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
this.retryTemplate.registerListener(this.listener);
}
}
@Override
protected void doStart() {
if (consumerProperties == null
@ -98,18 +128,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
String tags = consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
}
Set<String> tagsSet = tags == null ? new HashSet<>()
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
@ -122,11 +140,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
consumer.subscribe(destination, MessageSelector
this.consumer.subscribe(destination, MessageSelector
.bySql(consumerProperties.getExtension().getSql()));
}
else {
consumer.subscribe(destination, String.join(" || ", tagsSet));
this.consumer.subscribe(destination, String.join(" || ", tagsSet));
}
Optional.ofNullable(consumerInstrumentation)
.ifPresent(c -> c.markStartedSuccessfully());
@ -139,7 +157,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
}
consumer.registerMessageListener(listener);
this.consumer.registerMessageListener(this.listener);
try {
consumersManager.startConsumer(group);
@ -214,10 +232,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
});
throw new RuntimeException(
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
}
return null;
}
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
@ -229,18 +245,24 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody())
.setHeaders(new RocketMQMessageHeaderAccessor()
.withAcknowledgment(acknowledgement)
.withTags(msg.getTags()).withKeys(msg.getKeys())
.withFlag(msg.getFlag()).withRocketMessage(msg))
.build();
Message<byte[]> toChannel = convertMessagingFromRocketMQMsg(msg,
acknowledgement);
acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
});
return acknowledgements.get(0);
}
private Message convertMessagingFromRocketMQMsg(MessageExt msg,
Acknowledgement acknowledgement) {
return MessageBuilder.withPayload(msg.getBody())
.setHeaders(new RocketMQMessageHeaderAccessor()
.withAcknowledgment(acknowledgement).withTags(msg.getTags())
.withKeys(msg.getKeys()).withFlag(msg.getFlag())
.withRocketMessage(msg))
.build();
}
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
@ -283,9 +305,16 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
if (acknowledgement != null) {
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
else {
context.setDelayLevelWhenNextConsume(consumerProperties.getExtension()
.getError().getDelayLevelWhenNextConsume());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
@ -296,11 +325,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
if (acknowledgement != null) {
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}
else {
context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension()
.getError().getSuspendCurrentQueueTimeMillis());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
}

View File

@ -33,20 +33,28 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
@ -57,6 +65,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private TransactionCheckListener transactionCheckListener;
private MessageChannel sendFailureChannel;
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private final String destination;
@ -88,6 +98,9 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
producer = new DefaultMQProducer(destination);
}
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
producerInstrumentation = manager.getProducerInstrumentation(destination);
manager.addHealthInstrumentation(producerInstrumentation);
@ -131,8 +144,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception {
Message toSend = null;
try {
Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[]) message.getPayload());
}
@ -166,7 +179,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message);
}
else {
throw new RocketMQSendFailureException(message, toSend,
new MQClientException("message hasn't been sent", null));
}
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
@ -184,18 +203,60 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(
new RocketMQSendFailureException(message, toSend, e), null));
}
else {
throw new RocketMQSendFailureException(message, toSend, e);
}
}
}
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
* {@link DefaultMQProducer#sendMessageInTransaction}.
* @param localTransactionExecuter the executer running when produce msg.
*/
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
* {@link TransactionMQProducer#setTransactionCheckListener}.
* @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
*/
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
/**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link RocketMQSendFailureException} with the
* failed message and cause.
* @param sendFailureChannel the failure channel.
* @since 0.2.2
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
this.sendFailureChannel = sendFailureChannel;
}
/**
* Set the error message strategy implementation to use when sending error messages
* after send failures. Cannot be null.
* @param errorMessageStrategy the implementation.
* @since 0.2.2
*/
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
this.errorMessageStrategy = errorMessageStrategy;
}
public MessageChannel getSendFailureChannel() {
return sendFailureChannel;
}
}

View File

@ -52,6 +52,40 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
private Error error;
public static class Error {
/**
* Reconsume later timeMillis in ConsumeOrderlyContext.
*/
private Long suspendCurrentQueueTimeMillis = 1000L;
/**
* Message consume retry strategy in ConsumeConcurrentlyContext.
*
* -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client
* control retry frequency
*/
private Integer delayLevelWhenNextConsume = 0;
public Long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public Integer getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
}
public String getTags() {
return tags;
}
@ -91,4 +125,12 @@ public class RocketMQConsumerProperties {
public void setBroadcasting(Boolean broadcasting) {
this.broadcasting = broadcasting;
}
public Error getError() {
return error;
}
public void setError(Error error) {
this.error = error;
}
}

View File

@ -45,6 +45,8 @@ public class RocketMQProducerProperties {
*/
private String transactionCheckListener;
private Boolean vipChannelEnabled = true;
public Boolean getEnabled() {
return enabled;
}
@ -84,4 +86,12 @@ public class RocketMQProducerProperties {
public void setTransactionCheckListener(String transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public Boolean getVipChannelEnabled() {
return vipChannelEnabled;
}
public void setVipChannelEnabled(Boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled;
}
}