1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

add rocketmq starter and fix some small problems

This commit is contained in:
fangjian0423
2018-11-20 14:21:31 +08:00
parent 4539ed7183
commit 2596f014f7
6 changed files with 118 additions and 13 deletions

View File

@@ -5,22 +5,22 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/
public interface RocketMQBinderConstants {
/**
* Header key
*/
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
/**
* Header key
*/
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
String ROCKET_FLAG = "ROCKETMQ_FLAG";
String ROCKET_FLAG = "ROCKETMQ_FLAG";
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
* Instrumentation key
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
/**
* Instrumentation key
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
String ENDPOINT_ID = "rocketmq-binder";
String ENDPOINT_ID = "rocketmq-binder";
}

View File

@@ -43,6 +43,8 @@ import org.springframework.util.StringUtils;
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
private ConsumerInstrumentation consumerInstrumentation;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
@@ -132,7 +134,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final InstrumentationManager instrumentationManager;

View File

@@ -0,0 +1,72 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class,
RocketMQBinderAutoConfiguration.class))
.withPropertyValues(
"spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"spring.cloud.stream.bindings.output.content-type=application/json",
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input1.content-type=application/json",
"spring.cloud.stream.bindings.input1.group=test-group1",
"spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input2.content-type=application/json",
"spring.cloud.stream.bindings.input2.group=test-group2",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1");
@Test
public void testProperties() {
this.contextRunner.run(context -> {
RocketMQBinderConfigurationProperties binderConfigurationProperties = context
.getBean(RocketMQBinderConfigurationProperties.class);
assertThat(binderConfigurationProperties.getNamesrvAddr())
.isEqualTo("127.0.0.1:9876");
RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class);
assertThat(
bindingProperties.getExtendedConsumerProperties("input2").getTags())
.isEqualTo("tag1");
assertThat(bindingProperties.getExtendedConsumerProperties("input2")
.getOrderly()).isFalse();
assertThat(bindingProperties.getExtendedConsumerProperties("input1")
.getOrderly()).isTrue();
});
}
}