mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
sync & commit in greenwich
This commit is contained in:
@@ -22,6 +22,20 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
@@ -30,6 +44,7 @@ import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
@@ -48,22 +63,6 @@ import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
|
@@ -59,7 +59,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
|
||||
/**
|
||||
* A class that Listen on rocketmq message
|
||||
* A class that Listen on rocketmq message.
|
||||
* <p>
|
||||
* this class will delegate {@link RocketMQListener} to handle message
|
||||
*
|
||||
@@ -104,12 +104,16 @@ public class RocketMQListenerBindingContainer
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
|
||||
|
||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
// The following properties came from RocketMQConsumerProperties.
|
||||
private ConsumeMode consumeMode;
|
||||
|
||||
private SelectorType selectorType;
|
||||
|
||||
private String selectorExpression;
|
||||
|
||||
private MessageModel messageModel;
|
||||
|
||||
public RocketMQListenerBindingContainer(
|
||||
@@ -120,8 +124,7 @@ public class RocketMQListenerBindingContainer
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
|
||||
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
|
||||
? ConsumeMode.ORDERLY
|
||||
: ConsumeMode.CONCURRENTLY;
|
||||
? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
|
||||
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
|
||||
this.selectorType = SelectorType.TAG;
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
|
||||
@@ -131,8 +134,7 @@ public class RocketMQListenerBindingContainer
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
|
||||
}
|
||||
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
|
||||
? MessageModel.BROADCASTING
|
||||
: MessageModel.CLUSTERING;
|
||||
? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -386,6 +388,23 @@ public class RocketMQListenerBindingContainer
|
||||
this.headerMapper = headerMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert rocketmq {@link MessageExt} to Spring {@link Message}.
|
||||
* @param messageExt the rocketmq message
|
||||
* @return the converted Spring {@link Message}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||
|
||||
// add reconsume-times header to messageExt
|
||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||
String.valueOf(reconsumeTimes));
|
||||
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
return MessageBuilder.fromMessage(message)
|
||||
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerConcurrently
|
||||
implements MessageListenerConcurrently {
|
||||
|
||||
@@ -411,6 +430,7 @@ public class RocketMQListenerBindingContainer
|
||||
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
|
||||
@@ -438,24 +458,7 @@ public class RocketMQListenerBindingContainer
|
||||
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert rocketmq {@link MessageExt} to Spring {@link Message}
|
||||
*
|
||||
* @param messageExt the rocketmq message
|
||||
* @return the converted Spring {@link Message}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||
|
||||
// add reconsume-times header to messageExt
|
||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||
String.valueOf(reconsumeTimes));
|
||||
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
return MessageBuilder.fromMessage(message)
|
||||
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -16,21 +16,26 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import javax.validation.constraints.Pattern;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||
@Validated
|
||||
public class RocketMQBinderConfigurationProperties {
|
||||
|
||||
/**
|
||||
* The name server for rocketMQ, formats: `host:port;host:port`.
|
||||
*/
|
||||
@Pattern(regexp = "^[\\d.:;]+$", message = "nameServer needs to match expression \"host:port;host:port\"")
|
||||
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
|
||||
|
||||
/**
|
||||
@@ -93,4 +98,5 @@ public class RocketMQBinderConfigurationProperties {
|
||||
public void setCustomizedTraceTopic(String customizedTraceTopic) {
|
||||
this.customizedTraceTopic = customizedTraceTopic;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -1,3 +1,19 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector;
|
||||
|
||||
import java.util.List;
|
||||
@@ -11,7 +27,6 @@ import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
|
||||
/**
|
||||
* @author wangxing
|
||||
* @create 2019/7/3
|
||||
*/
|
||||
public class PartitionMessageQueueSelector implements MessageQueueSelector {
|
||||
|
||||
|
@@ -20,22 +20,22 @@ import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* jackson header mapper for RocketMQ. Header types are added to a special header
|
||||
@@ -45,6 +45,7 @@ import com.google.common.collect.Maps;
|
||||
* @since 2.1.1.RELEASE
|
||||
*/
|
||||
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||
|
||||
private final static Logger log = LoggerFactory
|
||||
.getLogger(JacksonRocketMQHeaderMapper.class);
|
||||
|
||||
@@ -71,8 +72,8 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||
|
||||
@Override
|
||||
public Map<String, String> fromHeaders(MessageHeaders headers) {
|
||||
final Map<String, String> target = Maps.newHashMap();
|
||||
final Map<String, String> jsonHeaders = Maps.newHashMap();
|
||||
final Map<String, String> target = new HashMap<>();
|
||||
final Map<String, String> jsonHeaders = new HashMap<>();
|
||||
headers.forEach((key, value) -> {
|
||||
if (matches(key)) {
|
||||
if (value instanceof String) {
|
||||
@@ -104,7 +105,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||
|
||||
@Override
|
||||
public MessageHeaders toHeaders(Map<String, String> source) {
|
||||
final Map<String, Object> target = Maps.newHashMap();
|
||||
final Map<String, Object> target = new HashMap<>();
|
||||
final Map<String, String> jsonTypes = decodeJsonTypes(source);
|
||||
source.forEach((key, value) -> {
|
||||
if (matches(key) && !(key.equals(JSON_TYPES))) {
|
||||
@@ -281,4 +282,5 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user