mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
基于F版的一些适配性调整
This commit is contained in:
@@ -46,17 +46,14 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.StaticMessageHeaderAccessor;
|
||||
import org.springframework.integration.acks.AcknowledgmentCallback;
|
||||
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
|
||||
import org.springframework.integration.channel.AbstractMessageChannel;
|
||||
import org.springframework.integration.support.StaticMessageHeaderAccessor;
|
||||
import org.springframework.integration.support.AcknowledgmentCallback;
|
||||
import org.springframework.integration.support.AcknowledgmentCallback.Status;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
@@ -71,12 +68,9 @@ public class RocketMQMessageChannelBinder extends
|
||||
implements
|
||||
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
|
||||
|
||||
private RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
private final RocketMQProperties rocketMQProperties;
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
private Map<String, String> topicInUse = new HashMap<>();
|
||||
@@ -96,7 +90,7 @@ public class RocketMQMessageChannelBinder extends
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
MessageChannel channel, MessageChannel errorChannel) throws Exception {
|
||||
MessageChannel errorChannel) throws Exception {
|
||||
if (producerProperties.getExtension().getEnabled()) {
|
||||
|
||||
// if producerGroup is empty, using destination
|
||||
@@ -168,11 +162,7 @@ public class RocketMQMessageChannelBinder extends
|
||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||
rocketMQTemplate, destination.getName(), producerGroup,
|
||||
producerProperties.getExtension().getTransactional(),
|
||||
instrumentationManager, producerProperties,
|
||||
((AbstractMessageChannel) channel).getChannelInterceptors().stream()
|
||||
.filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
|
||||
.map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
|
||||
.findFirst().orElse(null));
|
||||
instrumentationManager, producerProperties);
|
||||
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
||||
messageHandler.setSync(producerProperties.getExtension().getSync());
|
||||
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
|
||||
@@ -187,14 +177,6 @@ public class RocketMQMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
MessageChannel errorChannel) throws Exception {
|
||||
throw new UnsupportedOperationException(
|
||||
"The abstract binder should not call this method");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
|
||||
String group,
|
||||
@@ -286,16 +268,6 @@ public class RocketMQMessageChannelBinder extends
|
||||
return topicInUse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDefaultsPrefix() {
|
||||
return extendedBindingProperties.getDefaultsPrefix();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return extendedBindingProperties.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setExtendedBindingProperties(
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
|
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.support.DefaultErrorMessageStrategy;
|
||||
@@ -81,20 +80,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
|
||||
private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
|
||||
|
||||
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
|
||||
|
||||
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
|
||||
String groupName, Boolean transactional,
|
||||
InstrumentationManager instrumentationManager,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||
MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
|
||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
this.destination = destination;
|
||||
this.groupName = groupName;
|
||||
this.transactional = transactional;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
this.producerProperties = producerProperties;
|
||||
this.partitioningInterceptor = partitioningInterceptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -126,8 +121,6 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
||||
destination, producerProperties.getPartitionCount(),
|
||||
messageQueues.size()));
|
||||
producerProperties.setPartitionCount(messageQueues.size());
|
||||
partitioningInterceptor
|
||||
.setPartitionCount(producerProperties.getPartitionCount());
|
||||
}
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
|
@@ -40,8 +40,8 @@ import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.acks.AcknowledgmentCallback;
|
||||
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
|
||||
import org.springframework.integration.support.AcknowledgmentCallback;
|
||||
import org.springframework.integration.support.AcknowledgmentCallbackFactory;
|
||||
import org.springframework.integration.endpoint.AbstractMessageSource;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -237,6 +237,11 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
public static class RocketMQCallbackFactory
|
||||
implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
|
||||
|
||||
|
@@ -16,19 +16,16 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider {
|
||||
public class RocketMQBindingProperties {
|
||||
|
||||
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
|
||||
|
||||
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
|
||||
|
||||
@Override
|
||||
public RocketMQConsumerProperties getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
@@ -37,7 +34,6 @@ public class RocketMQBindingProperties implements BinderSpecificPropertiesProvid
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocketMQProducerProperties getProducer() {
|
||||
return producer;
|
||||
}
|
||||
|
@@ -1,11 +1,11 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -16,28 +16,71 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.stream.rocketmq")
|
||||
public class RocketMQExtendedBindingProperties extends
|
||||
AbstractExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties, RocketMQBindingProperties> {
|
||||
public class RocketMQExtendedBindingProperties implements
|
||||
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default";
|
||||
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public String getDefaultsPrefix() {
|
||||
return DEFAULTS_PREFIX;
|
||||
public Map<String, RocketMQBindingProperties> getBindings() {
|
||||
return this.bindings;
|
||||
}
|
||||
|
||||
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
|
||||
this.bindings = bindings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return RocketMQBindingProperties.class;
|
||||
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
|
||||
String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getConsumer() != null) {
|
||||
return bindings.get(channelName).getConsumer();
|
||||
}
|
||||
else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
this.bindings.get(channelName).setConsumer(properties);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setConsumer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RocketMQProducerProperties getExtendedProducerProperties(
|
||||
String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getProducer() != null) {
|
||||
return bindings.get(channelName).getProducer();
|
||||
}
|
||||
else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
this.bindings.get(channelName).setProducer(properties);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setProducer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -16,6 +16,8 @@
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
@@ -56,7 +58,7 @@ public class RocketMQAutoConfigurationTests {
|
||||
RocketMQBinderConfigurationProperties binderConfigurationProperties = context
|
||||
.getBean(RocketMQBinderConfigurationProperties.class);
|
||||
assertThat(binderConfigurationProperties.getNameServer())
|
||||
.isEqualTo("127.0.0.1:9876");
|
||||
.isEqualTo(Arrays.asList("127.0.0.1:9876"));
|
||||
RocketMQExtendedBindingProperties bindingProperties = context
|
||||
.getBean(RocketMQExtendedBindingProperties.class);
|
||||
assertThat(
|
||||
|
Reference in New Issue
Block a user