1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

format code with maven plugins

This commit is contained in:
fangjian0423
2019-09-26 17:15:41 +08:00
parent ed6942d9df
commit 2435167e1e
529 changed files with 6304 additions and 5164 deletions

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,23 +22,30 @@ import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
*/
public interface RocketMQBinderConstants {
public final class RocketMQBinderConstants {
/**
* Header key
* Header key for RocketMQ Transactional Args.
*/
String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
/**
* Default value
* Default NameServer value.
*/
String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
/**
* RocketMQ re-consume times
* Default group for SCS RocketMQ Binder.
*/
String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
/**
* RocketMQ re-consume times.
*/
public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
private RocketMQBinderConstants() {
throw new AssertionError("Must not instantiate constant utility class");
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,15 +16,19 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBinderUtils {
public final class RocketMQBinderUtils {
private RocketMQBinderUtils() {
}
public static RocketMQBinderConfigurationProperties mergeProperties(
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,6 +22,20 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -30,6 +44,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -48,22 +63,6 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@@ -75,7 +74,9 @@ public class RocketMQMessageChannelBinder extends
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties();
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQProperties rocketMQProperties;
private final InstrumentationManager instrumentationManager;
private Map<String, String> topicInUse = new HashMap<>();
@@ -101,8 +102,7 @@ public class RocketMQMessageChannelBinder extends
// if producerGroup is empty, using destination
String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName()
: extendedProducerGroup;
? destination.getName() : extendedProducerGroup;
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
.mergeProperties(rocketBinderConfigurationProperties,

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,13 +16,13 @@
package com.alibaba.cloud.stream.binder.rocketmq.actuator;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@@ -50,4 +50,5 @@ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
.forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException()));
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,19 +16,19 @@
package com.alibaba.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* @author Timur Valiev

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,13 +16,13 @@
package com.alibaba.cloud.stream.binder.rocketmq.config;
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,6 +16,8 @@
package com.alibaba.cloud.stream.binder.rocketmq.config;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -24,6 +26,7 @@ import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -32,10 +35,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,11 +16,13 @@
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
import java.util.List;
import java.util.Objects;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -45,6 +47,7 @@ import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.SmartLifecycle;
@@ -53,13 +56,10 @@ import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
/**
* A class that Listen on rocketmq message
* A class that Listen on rocketmq message.
* <p>
* this class will delegate {@link RocketMQListener} to handle message
*
@@ -104,12 +104,16 @@ public class RocketMQListenerBindingContainer
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
// The following properties came from RocketMQConsumerProperties.
private ConsumeMode consumeMode;
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
public RocketMQListenerBindingContainer(
@@ -120,8 +124,7 @@ public class RocketMQListenerBindingContainer
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
? ConsumeMode.ORDERLY
: ConsumeMode.CONCURRENTLY;
? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
this.selectorType = SelectorType.TAG;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
@@ -131,8 +134,7 @@ public class RocketMQListenerBindingContainer
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
}
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
? MessageModel.BROADCASTING
: MessageModel.CLUSTERING;
? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
}
@Override
@@ -386,6 +388,23 @@ public class RocketMQListenerBindingContainer
this.headerMapper = headerMapper;
}
/**
* Convert rocketmq {@link MessageExt} to Spring {@link Message}.
* @param messageExt the rocketmq message
* @return the converted Spring {@link Message}
*/
@SuppressWarnings("unchecked")
private Message convertToSpringMessage(MessageExt messageExt) {
// add reconsume-times header to messageExt
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes));
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
return MessageBuilder.fromMessage(message)
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
}
public class DefaultMessageListenerConcurrently
implements MessageListenerConcurrently {
@@ -411,6 +430,7 @@ public class RocketMQListenerBindingContainer
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@@ -438,24 +458,7 @@ public class RocketMQListenerBindingContainer
return ConsumeOrderlyStatus.SUCCESS;
}
}
/**
* Convert rocketmq {@link MessageExt} to Spring {@link Message}
*
* @param messageExt the rocketmq message
* @return the converted Spring {@link Message}
*/
@SuppressWarnings("unchecked")
private Message convertToSpringMessage(MessageExt messageExt) {
// add reconsume-times header to messageExt
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes));
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
return MessageBuilder.fromMessage(message)
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -58,4 +58,5 @@ public class RocketMQMessageQueueChooser {
public List<MessageQueue> getMessageQueues() {
return messageQueues;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +16,14 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
@@ -31,11 +36,6 @@ import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@@ -170,6 +170,7 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
RetryCallback<T, E> callback, Throwable throwable) {
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
@@ -30,6 +35,7 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
@@ -45,12 +51,6 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@@ -298,4 +298,5 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,9 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Set;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -31,6 +34,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
@@ -43,10 +47,6 @@ import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@@ -324,6 +324,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged="
+ acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}';
}
}
public class RocketMQAckInfo {
@@ -374,6 +375,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
+ pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset
+ '}';
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,8 +23,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Instrumentation {
private final String name;
protected final AtomicBoolean started = new AtomicBoolean(false);
protected Exception startException = null;
public Instrumentation(String name) {
@@ -63,4 +66,5 @@ public class Instrumentation {
public Exception getStartException() {
return startException;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,10 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Timur Valiev
@@ -93,4 +93,5 @@ public class RocketMQBinderConfigurationProperties {
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -45,4 +45,5 @@ public class RocketMQBindingProperties implements BinderSpecificPropertiesProvid
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,6 +18,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.properties;
import java.util.Set;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -25,8 +26,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@@ -34,43 +33,43 @@ import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMap
public class RocketMQConsumerProperties {
/**
* using '||' to split tag {@link MQPushConsumer#subscribe(String, String)}
* using '||' to split tag {@link MQPushConsumer#subscribe(String, String)}.
*/
private String tags;
/**
* {@link MQPushConsumer#subscribe(String, MessageSelector)}
* {@link MessageSelector#bySql(String)}
* {@link MessageSelector#bySql(String)}.
*/
private String sql;
/**
* {@link MessageModel#BROADCASTING}
* {@link MessageModel#BROADCASTING}.
*/
private Boolean broadcasting = false;
/**
* if orderly is true, using {@link MessageListenerOrderly} else if orderly if false,
* using {@link MessageListenerConcurrently}
* using {@link MessageListenerConcurrently}.
*/
private Boolean orderly = false;
/**
* for concurrently listener. message consume retry strategy. see
* {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or
* discard, see {@link this#shouldRequeue}), others means requeue
* discard, see {@link this#shouldRequeue}), others means requeue.
*/
private int delayLevelWhenNextConsume = 0;
/**
* for orderly listener. next retry delay time
* for orderly listener. next retry delay time.
*/
private long suspendCurrentQueueTimeMillis = 1000;
private Boolean enabled = true;
/**
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}.
*/
private Set<String> trustedPackages;
@@ -165,4 +164,5 @@ public class RocketMQConsumerProperties {
public void setTrustedPackages(Set<String> trustedPackages) {
this.trustedPackages = trustedPackages;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,4 +39,5 @@ public class RocketMQExtendedBindingProperties extends
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return RocketMQBindingProperties.class;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -163,4 +163,4 @@ public class RocketMQProducerProperties {
this.retryNextServer = retryNextServer;
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,8 +16,11 @@
package com.alibaba.cloud.stream.binder.rocketmq.provisioning;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
@@ -25,9 +28,6 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>

View File

@@ -1,3 +1,19 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector;
import java.util.List;
@@ -7,11 +23,11 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
/**
* @author wangxing
* @create 2019/7/3
*/
public class PartitionMessageQueueSelector implements MessageQueueSelector {
@@ -36,4 +52,4 @@ public class PartitionMessageQueueSelector implements MessageQueueSelector {
return mqs.get(partition);
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2019 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
@@ -30,6 +31,7 @@ import org.springframework.util.Assert;
* @since 2.1.1.RELEASE
*/
public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper {
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private Charset charset;
@@ -60,4 +62,5 @@ public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapp
Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2019 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,22 +20,22 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.ClassUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.ClassUtils;
/**
* jackson header mapper for RocketMQ. Header types are added to a special header
@@ -45,6 +45,7 @@ import com.google.common.collect.Maps;
* @since 2.1.1.RELEASE
*/
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
private final static Logger log = LoggerFactory
.getLogger(JacksonRocketMQHeaderMapper.class);
@@ -57,6 +58,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
public static final String JSON_TYPES = "spring_json_header_types";
private final ObjectMapper objectMapper;
private final Set<String> trustedPackages = new LinkedHashSet<>(
DEFAULT_TRUSTED_PACKAGES);
@@ -71,8 +73,8 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
@Override
public Map<String, String> fromHeaders(MessageHeaders headers) {
final Map<String, String> target = Maps.newHashMap();
final Map<String, String> jsonHeaders = Maps.newHashMap();
final Map<String, String> target = new HashMap<>();
final Map<String, String> jsonHeaders = new HashMap<>();
headers.forEach((key, value) -> {
if (matches(key)) {
if (value instanceof String) {
@@ -104,7 +106,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
@Override
public MessageHeaders toHeaders(Map<String, String> source) {
final Map<String, Object> target = Maps.newHashMap();
final Map<String, Object> target = new HashMap<>();
final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach((key, value) -> {
if (matches(key) && !(key.equals(JSON_TYPES))) {
@@ -281,4 +283,5 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2019 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,12 +21,13 @@ import java.util.Map;
import org.springframework.messaging.MessageHeaders;
/**
* header value mapper for RocketMQ
* header value mapper for RocketMQ.
*
* @author caotc
* @since 2.1.1.RELEASE
*/
public interface RocketMQHeaderMapper {
/**
* Map from the given {@link MessageHeaders} to the specified target message.
* @param headers the abstracted MessageHeaders.
@@ -40,4 +41,5 @@ public interface RocketMQHeaderMapper {
* @return the target headers.
*/
MessageHeaders toHeaders(Map<String, String> source);
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,15 +16,15 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@@ -68,4 +68,4 @@ public class RocketMQAutoConfigurationTests {
});
}
}
}