diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java index 032b1fae..048635d9 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -49,6 +50,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.cloud.commons.util.InetUtils; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; @@ -165,6 +167,9 @@ public class DubboServiceMetadataRepository @Autowired private DiscoveryClient discoveryClient; + @Autowired + private LoadBalancerClient loadBalancerClient; + @Autowired private JSONUtils jsonUtils; @@ -618,7 +623,7 @@ public class DubboServiceMetadataRepository } protected void initSubscribedDubboMetadataService(String serviceName) { - discoveryClient.getInstances(serviceName).stream().findAny() + Optional.ofNullable(loadBalancerClient.choose(serviceName)) .map(this::getDubboMetadataServiceURLs) .ifPresent(dubboMetadataServiceURLs -> { dubboMetadataServiceURLs.forEach(dubboMetadataServiceURL -> { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index bc35de93..e580594e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,8 +16,11 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; @@ -56,6 +59,9 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindi import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; + import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -89,8 +95,7 @@ public class RocketMQMessageChannelBinder extends @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, - MessageChannel channel, - MessageChannel errorChannel) throws Exception { + MessageChannel channel, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { // if producerGroup is empty, using destination @@ -169,7 +174,7 @@ public class RocketMQMessageChannelBinder extends .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); - + messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } @@ -210,6 +215,7 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); + listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); @@ -294,4 +300,27 @@ public class RocketMQMessageChannelBinder extends this.extendedBindingProperties = extendedBindingProperties; } + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedConsumerProperties extendedConsumerProperties) { + Set trustedPackages = extendedConsumerProperties.getExtension() + .getTrustedPackages(); + return createHeaderMapper(trustedPackages); + } + + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedProducerProperties producerProperties) { + return createHeaderMapper(Collections.emptyList()); + } + + private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages) { + ObjectMapper objectMapper = this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next(); + JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper( + objectMapper); + if (!StringUtils.isEmpty(trustedPackages)) { + headerMapper.addTrustedPackages(trustedPackages); + } + return headerMapper; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 5bf540e0..8e00a249 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -25,7 +25,12 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; -import org.apache.rocketmq.client.consumer.listener.*; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -43,6 +48,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.context.SmartLifecycle; +import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -50,6 +56,7 @@ import org.springframework.util.StringUtils; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; /** * A class that Listen on rocketmq message @@ -88,6 +95,8 @@ public class RocketMQListenerBindingContainer private RocketMQListener rocketMQListener; + private RocketMQHeaderMapper headerMapper; + private DefaultMQPushConsumer consumer; private boolean running; @@ -369,6 +378,14 @@ public class RocketMQListenerBindingContainer return messageModel; } + public RocketMQHeaderMapper getHeaderMapper() { + return headerMapper; + } + + public void setHeaderMapper(RocketMQHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @@ -429,14 +446,16 @@ public class RocketMQListenerBindingContainer * @param messageExt the rocketmq message * @return the converted Spring {@link Message} */ + @SuppressWarnings("unchecked") private Message convertToSpringMessage(MessageExt messageExt) { // add reconsume-times header to messageExt int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); - - return RocketMQUtil.convertToSpringMessage(messageExt); + Message message = RocketMQUtil.convertToSpringMessage(messageExt); + return MessageBuilder.fromMessage(message) + .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 9186b064..ec5c4c6e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -17,6 +17,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.rocketmq.client.exception.MQClientException; @@ -37,6 +38,7 @@ import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; @@ -47,6 +49,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; /** * @author Jim @@ -62,6 +65,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final RocketMQTemplate rocketMQTemplate; + private RocketMQHeaderMapper headerMapper; + private final Boolean transactional; private final String destination; @@ -149,6 +154,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { try { + // issue 737 fix + Map jsonHeaders = headerMapper + .fromHeaders(message.getHeaders()); + message = org.springframework.messaging.support.MessageBuilder + .fromMessage(message).copyHeaders(jsonHeaders).build(); + final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") @@ -156,6 +167,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li if (!StringUtils.isEmpty(tags)) { topicWithTags.append(":").append(tags); } + SendResult sendRes = null; if (transactional) { sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, @@ -195,6 +207,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { + Message finalMessage = message; SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -210,7 +223,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage(new MessagingException( - message, e), null)); + finalMessage, e), null)); } } }; @@ -277,4 +290,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public void setSync(boolean sync) { this.sync = sync; } + + public RocketMQHeaderMapper getHeaderMapper() { + return headerMapper; + } + + public void setHeaderMapper(RocketMQHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 9554e580..53489355 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -16,6 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.properties; +import java.util.Set; + import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -23,6 +25,8 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; + /** * @author Timur Valiev * @author Jim @@ -65,6 +69,11 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + /** + * {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)} + */ + private Set trustedPackages; + // ------------ For Pull Consumer ------------ private long pullTimeout = 10 * 1000; @@ -148,4 +157,12 @@ public class RocketMQConsumerProperties { public boolean shouldRequeue() { return delayLevelWhenNextConsume != -1; } + + public Set getTrustedPackages() { + return trustedPackages; + } + + public void setTrustedPackages(Set trustedPackages) { + this.trustedPackages = trustedPackages; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java new file mode 100644 index 00000000..bd9e997b --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.rocketmq.common.message.MessageConst; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; + +/** + * Base for RocketMQ header mappers. + * + * @author caotc + * @since 2.1.1.RELEASE + */ +public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper { + private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + + private Charset charset; + + public AbstractRocketMQHeaderMapper() { + this(DEFAULT_CHARSET); + } + + public AbstractRocketMQHeaderMapper(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } + + protected boolean matches(String headerName) { + return !MessageConst.STRING_HASH_SET.contains(headerName) + && !MessageHeaders.ID.equals(headerName) + && !MessageHeaders.TIMESTAMP.equals(headerName) + && !MessageHeaders.CONTENT_TYPE.equals(headerName) + && !MessageHeaders.REPLY_CHANNEL.equals(headerName) + && !MessageHeaders.ERROR_CHANNEL.equals(headerName); + } + + public Charset getCharset() { + return charset; + } + + public void setCharset(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java new file mode 100644 index 00000000..2c582b97 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -0,0 +1,284 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.lang.Nullable; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.ClassUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; + +/** + * jackson header mapper for RocketMQ. Header types are added to a special header + * {@link #JSON_TYPES}. + * + * @author caotc + * @since 2.1.1.RELEASE + */ +public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper { + private final static Logger log = LoggerFactory + .getLogger(JacksonRocketMQHeaderMapper.class); + + private static final List DEFAULT_TRUSTED_PACKAGES = Arrays + .asList("java.lang", "java.net", "java.util", "org.springframework.util"); + + /** + * Header name for java types of other headers. + */ + public static final String JSON_TYPES = "spring_json_header_types"; + + private final ObjectMapper objectMapper; + private final Set trustedPackages = new LinkedHashSet<>( + DEFAULT_TRUSTED_PACKAGES); + + public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) { + super(charset); + this.objectMapper = objectMapper; + } + + @Override + public Map fromHeaders(MessageHeaders headers) { + final Map target = Maps.newHashMap(); + final Map jsonHeaders = Maps.newHashMap(); + headers.forEach((key, value) -> { + if (matches(key)) { + if (value instanceof String) { + target.put(key, (String) value); + } + else { + try { + String className = value.getClass().getName(); + target.put(key, objectMapper.writeValueAsString(value)); + jsonHeaders.put(key, className); + } + catch (Exception e) { + log.debug("Could not map " + key + " with type " + + value.getClass().getName(), e); + } + } + } + }); + if (jsonHeaders.size() > 0) { + try { + target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders)); + } + catch (IllegalStateException | JsonProcessingException e) { + log.error("Could not add json types header", e); + } + } + return target; + } + + @Override + public MessageHeaders toHeaders(Map source) { + final Map target = Maps.newHashMap(); + final Map jsonTypes = decodeJsonTypes(source); + source.forEach((key, value) -> { + if (matches(key) && !(key.equals(JSON_TYPES))) { + if (jsonTypes != null && jsonTypes.containsKey(key)) { + Class type = Object.class; + String requestedType = jsonTypes.get(key); + boolean trusted = trusted(requestedType); + if (trusted) { + try { + type = ClassUtils.forName(requestedType, null); + } + catch (Exception e) { + log.error("Could not load class for header: " + key, e); + } + } + + if (trusted) { + try { + Object val = decodeValue(value, type); + target.put(key, val); + } + catch (IOException e) { + log.error("Could not decode json type: " + value + + " for key: " + key, e); + target.put(key, value); + } + } + else { + target.put(key, new NonTrustedHeaderType(value, requestedType)); + } + } + else { + target.put(key, value); + } + } + }); + return new MessageHeaders(target); + } + + /** + * @param packagesToTrust the packages to trust. + * @see #addTrustedPackages(Collection) + */ + public void addTrustedPackages(String... packagesToTrust) { + if (Objects.nonNull(packagesToTrust)) { + addTrustedPackages(Arrays.asList(packagesToTrust)); + } + } + + /** + * Add packages to the trusted packages list (default {@code java.util, java.lang}) + * used when constructing objects from JSON. If any of the supplied packages is + * {@code "*"}, all packages are trusted. If a class for a non-trusted package is + * encountered, the header is returned to the application with value of type + * {@link NonTrustedHeaderType}. + * @param packagesToTrust the packages to trust. + */ + public void addTrustedPackages(Collection packagesToTrust) { + if (packagesToTrust != null) { + for (String whiteList : packagesToTrust) { + if ("*".equals(whiteList)) { + this.trustedPackages.clear(); + break; + } + else { + this.trustedPackages.add(whiteList); + } + } + } + } + + public Set getTrustedPackages() { + return this.trustedPackages; + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + private Object decodeValue(String jsonString, Class type) + throws IOException, LinkageError { + Object value = objectMapper.readValue(jsonString, type); + if (type.equals(NonTrustedHeaderType.class)) { + // Upstream NTHT propagated; may be trusted here... + NonTrustedHeaderType nth = (NonTrustedHeaderType) value; + if (trusted(nth.getUntrustedType())) { + try { + value = objectMapper.readValue(nth.getHeaderValue(), + ClassUtils.forName(nth.getUntrustedType(), null)); + } + catch (Exception e) { + log.error("Could not decode header: " + nth, e); + } + } + } + return value; + } + + @Nullable + private Map decodeJsonTypes(Map source) { + if (source.containsKey(JSON_TYPES)) { + String value = source.get(JSON_TYPES); + try { + return objectMapper.readValue(value, + new TypeReference>() { + }); + } + catch (IOException e) { + log.error("Could not decode json types: " + value, e); + } + } + return null; + } + + protected boolean trusted(String requestedType) { + if (requestedType.equals(NonTrustedHeaderType.class.getName())) { + return true; + } + if (!this.trustedPackages.isEmpty()) { + int lastDot = requestedType.lastIndexOf('.'); + if (lastDot < 0) { + return false; + } + String packageName = requestedType.substring(0, lastDot); + for (String trustedPackage : this.trustedPackages) { + if (packageName.equals(trustedPackage) + || packageName.startsWith(trustedPackage + ".")) { + return true; + } + } + return false; + } + return true; + } + + /** + * Represents a header that could not be decoded due to an untrusted type. + */ + public static class NonTrustedHeaderType { + + private String headerValue; + + private String untrustedType; + + public NonTrustedHeaderType() { + super(); + } + + NonTrustedHeaderType(String headerValue, String untrustedType) { + this.headerValue = headerValue; + this.untrustedType = untrustedType; + } + + public void setHeaderValue(String headerValue) { + this.headerValue = headerValue; + } + + public String getHeaderValue() { + return this.headerValue; + } + + public void setUntrustedType(String untrustedType) { + this.untrustedType = untrustedType; + } + + public String getUntrustedType() { + return this.untrustedType; + } + + @Override + public String toString() { + return "NonTrustedHeaderType [headerValue=" + headerValue + ", untrustedType=" + + this.untrustedType + "]"; + } + + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java new file mode 100644 index 00000000..e291e8c7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import java.util.Map; + +import org.springframework.messaging.MessageHeaders; + +/** + * header value mapper for RocketMQ + * + * @author caotc + * @since 2.1.1.RELEASE + */ +public interface RocketMQHeaderMapper { + /** + * Map from the given {@link MessageHeaders} to the specified target message. + * @param headers the abstracted MessageHeaders. + * @return the native target message. + */ + Map fromHeaders(MessageHeaders headers); + + /** + * Map from the given target message to abstracted {@link MessageHeaders}. + * @param source the native target message. + * @return the target headers. + */ + MessageHeaders toHeaders(Map source); +}