1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

polish #761 update pkg name & maven coordinate for Edgware

This commit is contained in:
fangjian0423
2019-07-24 22:06:05 +08:00
parent 8e73a34e2a
commit f19c83a7a9
405 changed files with 2709 additions and 2568 deletions

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
package com.alibaba.cloud.stream.binder.rocketmq;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>

View File

@@ -14,12 +14,13 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
package com.alibaba.cloud.stream.binder.rocketmq;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
@@ -31,15 +31,6 @@ import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
@@ -47,6 +38,16 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.fasterxml.jackson.databind.ObjectMapper;
/**

View File

@@ -14,13 +14,14 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.actuator;
package com.alibaba.cloud.stream.binder.rocketmq.actuator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev

View File

@@ -14,21 +14,22 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.config;
package com.alibaba.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>

View File

@@ -14,13 +14,14 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.config;
package com.alibaba.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.config;
package com.alibaba.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -27,12 +27,13 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.fasterxml.jackson.databind.ObjectMapper;
/**

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
import java.util.List;
@@ -44,13 +44,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -14,16 +14,12 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.integration;
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
@@ -35,6 +31,11 @@ import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.integration;
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
@@ -25,9 +25,6 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
@@ -39,6 +36,10 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.metrics;
package com.alibaba.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.metrics;
package com.alibaba.cloud.stream.binder.rocketmq.metrics;
import java.util.HashMap;
import java.util.HashSet;

View File

@@ -14,11 +14,12 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.properties;
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
/**
* @author Timur Valiev

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.properties;
package com.alibaba.cloud.stream.binder.rocketmq.properties;
/**
* @author Timur Valiev
@@ -22,23 +22,23 @@ package org.springframework.cloud.stream.binder.rocketmq.properties;
*/
public class RocketMQBindingProperties {
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
public RocketMQConsumerProperties getConsumer() {
return consumer;
}
public RocketMQConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
public RocketMQProducerProperties getProducer() {
return producer;
}
public RocketMQProducerProperties getProducer() {
return producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.properties;
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;

View File

@@ -0,0 +1,86 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties implements
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
public Map<String, RocketMQBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(
String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.properties;
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

View File

@@ -0,0 +1,104 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory
.getLogger(RocketMQTopicProvisioner.class);
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties> properties)
throws ProvisioningException {
checkTopic(name);
return new RocketProducerDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> properties)
throws ProvisioningException {
checkTopic(name);
return new RocketConsumerDestination(name);
}
private void checkTopic(String topic) {
try {
Validators.checkTopic(topic);
}
catch (MQClientException e) {
logger.error("topic check error: " + topic, e);
throw new AssertionError(e); // Can't happen
}
}
private static final class RocketProducerDestination implements ProducerDestination {
private final String producerDestinationName;
RocketProducerDestination(String destinationName) {
this.producerDestinationName = destinationName;
}
@Override
public String getName() {
return producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return producerDestinationName;
}
}
private static final class RocketConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
RocketConsumerDestination(String consumerDestinationName) {
this.consumerDestinationName = consumerDestinationName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
}
}

View File

@@ -1,80 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties implements
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
public Map<String, RocketMQBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
}

View File

@@ -1,105 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQTopicProvisioner
implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketProducerDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketConsumerDestination(name);
}
private void checkTopic(String topic) {
try {
Validators.checkTopic(topic);
} catch (MQClientException e) {
logger.error("topic check error: " + topic, e);
throw new AssertionError(e); // Can't happen
}
}
private static final class RocketProducerDestination implements ProducerDestination {
private final String producerDestinationName;
RocketProducerDestination(String destinationName) {
this.producerDestinationName = destinationName;
}
@Override
public String getName() {
return producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return producerDestinationName;
}
}
private static final class RocketConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
RocketConsumerDestination(String consumerDestinationName) {
this.consumerDestinationName = consumerDestinationName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
}
}

View File

@@ -1 +1 @@
rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration
rocketmq:com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration

View File

@@ -1,2 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration

View File

@@ -14,18 +14,19 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
package com.alibaba.cloud.stream.binder.rocketmq;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.context.ConfigurableApplicationContext;
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/