1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

using eclipse code formatter to format code

This commit is contained in:
fangjian0423 2018-11-20 17:17:44 +08:00
parent c34ecfffd8
commit bfae54b51e
22 changed files with 946 additions and 852 deletions

View File

@ -6,7 +6,6 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<version>0.2.1.BUILD-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -26,11 +26,12 @@ import org.springframework.messaging.MessageHandler;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
private static final Logger logger = LoggerFactory
.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner;
@ -53,37 +54,44 @@ public class RocketMQMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties>
producerProperties,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
return new RocketMQMessageHandler(destination.getName(),
producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager);
} else {
throw new RuntimeException(
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
+ "has been disabled, message can't be delivered");
}
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
consumerProperties)
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
throw new RuntimeException(
"'group' must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
consumerProperties, destination.getName(), group, instrumentationManager);
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
consumersManager, consumerProperties, destination.getName(), group,
instrumentationManager);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
consumerProperties);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
group, consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
} else {
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
rocketInboundChannelAdapter
.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
rocketInboundChannelAdapter
.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter;

View File

@ -1,5 +1,10 @@
package org.springframework.cloud.stream.binder.rocketmq;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
import java.util.HashMap;
import java.util.Map;
@ -12,11 +17,6 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -35,13 +35,14 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
}
public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
public RocketMQMessageHeaderAccessor withAcknowledgment(
Acknowledgement acknowledgment) {
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
return this;
}
public String getTags() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
}
public RocketMQMessageHeaderAccessor withTags(String tag) {
@ -50,7 +51,7 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
}
public String getKeys() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
}
public RocketMQMessageHeaderAccessor withKeys(String keys) {
@ -68,7 +69,8 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
}
public Integer getDelayTimeLevel() {
return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
return (Integer) getMessageHeaders()
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
}
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
@ -77,7 +79,7 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
}
public Integer getFlag() {
return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
return (Integer) getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
}
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
@ -96,9 +98,10 @@ public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
public Map<String, String> getUserProperties() {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
result.put(entry.getKey(), (String)entry.getValue());
if (entry.getValue() instanceof String
&& !MessageConst.STRING_HASH_SET.contains(entry.getKey())
&& !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
result.put(entry.getKey(), (String) entry.getValue());
}
}
return result;

View File

@ -1,14 +1,15 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.MetricRegistry;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev

View File

@ -19,19 +19,20 @@ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isUp)) {
if (instrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isUp)) {
builder.up();
return;
}
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isOutOfService)) {
if (instrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isOutOfService)) {
builder.outOfService();
return;
}
builder.down();
instrumentationManager.getHealthInstrumentations().stream().
filter(instrumentation -> !instrumentation.isStarted()).
forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException()));
instrumentationManager.getHealthInstrumentations().stream()
.filter(instrumentation -> !instrumentation.isStarted())
.forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException()));
}
}

View File

@ -17,7 +17,8 @@ import org.springframework.context.annotation.Configuration;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration {
private final RocketMQExtendedBindingProperties extendedBindingProperties;
@ -25,11 +26,13 @@ public class RocketMQBinderAutoConfiguration {
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
public RocketMQBinderAutoConfiguration(
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean
@ -38,17 +41,21 @@ public class RocketMQBinderAutoConfiguration {
}
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
public RocketMQMessageChannelBinder rocketMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider,
InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, extendedBindingProperties, provisioningProvider,
rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
public ConsumersManager consumersManager(
InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager,
rocketBinderConfigurationProperties);
}
}

View File

@ -21,13 +21,16 @@ public class RocketMQBinderEndpointAutoConfiguration {
}
@Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) {
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(
InstrumentationManager instrumentationManager) {
return new RocketMQBinderHealthIndicator(instrumentationManager);
}
@Bean
public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime());
public InstrumentationManager instrumentationManager(
RocketMQBinderEndpoint rocketBinderEndpoint) {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(),
rocketBinderEndpoint.runtime());
}
}

View File

@ -29,7 +29,8 @@ public class Acknowledgement {
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
public Acknowledgement setConsumeConcurrentlyStatus(
ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
return this;
}
@ -42,7 +43,8 @@ public class Acknowledgement {
return consumeOrderlyStatus;
}
public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
public Acknowledgement setConsumeOrderlyStatus(
ConsumeOrderlyStatus consumeOrderlyStatus) {
this.consumeOrderlyStatus = consumeOrderlyStatus;
return this;
}
@ -59,7 +61,8 @@ public class Acknowledgement {
return consumeOrderlySuspendCurrentQueueTimeMill;
}
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
public void setConsumeOrderlySuspendCurrentQueueTimeMill(
Long consumeOrderlySuspendCurrentQueueTimeMill) {
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
}
@ -71,7 +74,8 @@ public class Acknowledgement {
public static Acknowledgement buildConcurrentlyInstance() {
Acknowledgement acknowledgement = new Acknowledgement();
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
acknowledgement
.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
return acknowledgement;
}

View File

@ -26,8 +26,7 @@ public class ConsumersManager {
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
= new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@ -37,10 +36,13 @@ public class ConsumersManager {
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
if (consumerGroups.containsKey(group)) {
@ -85,16 +87,18 @@ public class ConsumersManager {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
group);
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
try {
consumerGroups.get(group).start();
started.put(group, true);
groupInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
}
catch (MQClientException e) {
groupInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e);
throw e;
}
}
@ -103,4 +107,3 @@ public class ConsumersManager {
return consumerGroups.keySet();
}
}

View File

@ -43,7 +43,8 @@ import org.springframework.util.StringUtils;
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
private static final Logger logger = LoggerFactory
.getLogger(RocketMQInboundChannelAdapter.class);
private ConsumerInstrumentation consumerInstrumentation;
@ -78,35 +79,44 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
return;
}
String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly();
String tags = consumerProperties == null ? null
: consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties == null ? false
: consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
instrumentationManager)
final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly(instrumentationManager)
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
}
Set<String> tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
Set<String> tagsSet = tags == null ? new HashSet<>()
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
.collect(Collectors.toSet());
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
consumerInstrumentation = instrumentationManager
.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
} else {
consumer.subscribe(destination, MessageSelector
.bySql(consumerProperties.getExtension().getSql()));
}
else {
consumer.subscribe(destination, String.join(" || ", tagsSet));
}
consumerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
}
catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
}
@ -114,8 +124,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
try {
consumersManager.startConsumer(group);
} catch (MQClientException e) {
logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e);
}
catch (MQClientException e) {
logger.error(
"RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(),
e);
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
}
}
@ -146,47 +159,63 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
try {
if (enableRetry) {
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
(RetryCallback<Acknowledgement, Exception>)context -> doSendMsgs(msgs, context),
(RetryCallback<Acknowledgement, Exception>) context -> doSendMsgs(
msgs, context),
new RecoveryCallback<Acknowledgement>() {
@Override
public Acknowledgement recover(RetryContext context) throws Exception {
RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context);
if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) {
return Acknowledgement.buildConcurrentlyInstance();
} else {
public Acknowledgement recover(RetryContext context)
throws Exception {
RocketMQInboundChannelAdapter.this.recoveryCallback
.recover(context);
if (ClassUtils.isAssignable(this.getClass(),
MessageListenerConcurrently.class)) {
return Acknowledgement
.buildConcurrentlyInstance();
}
else {
return Acknowledgement.buildOrderlyInstance();
}
}
});
} else {
}
else {
Acknowledgement result = doSendMsgs(msgs, null);
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
return result;
}
} catch (Exception e) {
logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
}
catch (Exception e) {
logger.error(
"Rocket Message hasn't been processed successfully. Caused by ",
e);
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
throw new RuntimeException("Rocket Message hasn't been processed successfully. Caused by ", e);
throw new RuntimeException(
"Rocket Message hasn't been processed successfully. Caused by ",
e);
}
}
private Acknowledgement doSendMsgs(final List<MessageExt> msgs, RetryContext context) {
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
RetryContext context) {
List<Acknowledgement> acknowledgements = new ArrayList<>();
msgs.forEach(msg -> {
String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
String retryInfo = context == null ? ""
: "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
setHeaders(new RocketMQMessageHeaderAccessor().
withAcknowledgment(acknowledgement).
withTags(msg.getTags()).
withKeys(msg.getKeys()).
withFlag(msg.getFlag()).
withRocketMessage(msg)
).build();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody())
.setHeaders(new RocketMQMessageHeaderAccessor()
.withAcknowledgment(acknowledgement)
.withTags(msg.getTags()).withKeys(msg.getKeys())
.withFlag(msg.getFlag()).withRocketMessage(msg))
.build();
acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
});
@ -194,34 +223,39 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
if (throwable != null) {
instrumentationManager.getConsumerInstrumentation(
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
} else {
instrumentationManager.getConsumerInstrumentation(
}
else {
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
}
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
}
protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
MessageListenerConcurrently {
protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager) {
public CloudStreamMessageListenerConcurrently(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@ -229,22 +263,26 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
}
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements
MessageListenerOrderly {
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager) {
public CloudStreamMessageListenerOrderly(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}

View File

@ -40,7 +40,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
protected volatile boolean running = false;
public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties,
public RocketMQMessageHandler(String destination,
RocketMQProducerProperties producerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
this.destination = destination;
@ -53,7 +54,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void start() {
producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination);
producerInstrumentation = instrumentationManager
.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
@ -65,9 +67,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
try {
producer.start();
producerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
}
catch (MQClientException e) {
producerInstrumentation.markStartFailed(e);
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}
running = true;
@ -87,24 +91,30 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
@Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception {
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception {
try {
Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[])message.getPayload());
} else if (message.getPayload() instanceof String) {
toSend = new Message(destination, ((String)message.getPayload()).getBytes());
} else {
throw new UnsupportedOperationException(
"Payload class isn't supported: " + message.getPayload().getClass());
toSend = new Message(destination, (byte[]) message.getPayload());
}
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
else if (message.getPayload() instanceof String) {
toSend = new Message(destination,
((String) message.getPayload()).getBytes());
}
else {
throw new UnsupportedOperationException("Payload class isn't supported: "
+ message.getPayload().getClass());
}
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(
message);
headerAccessor.setLeaveMutable(true);
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
toSend.setTags(headerAccessor.getTags());
toSend.setKeys(headerAccessor.getKeys());
toSend.setFlag(headerAccessor.getFlag());
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties().entrySet()) {
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties()
.entrySet()) {
toSend.putUserProperty(entry.getKey(), entry.getValue());
}
@ -114,15 +124,19 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
throw new MQClientException("message hasn't been sent", null);
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes);
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes);
}
instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
instrumentationManager.getRuntime().put(
RocketMQBinderConstants.LASTSEND_TIMESTAMP,
Instant.now().toEpochMilli());
producerInstrumentation.markSent();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
UnsupportedOperationException e) {
}
catch (MQClientException | RemotingException | MQBrokerException
| InterruptedException | UnsupportedOperationException e) {
producerInstrumentation.markSentFailure();
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}

View File

@ -1,11 +1,11 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -20,9 +20,12 @@ public class ConsumerInstrumentation extends Instrumentation {
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
this.consumedPerSecond = registry
.meter(name(baseMetricName, "consumedPerSecond"));
this.totalConsumedFailures = registry
.counter(name(baseMetricName, "totalConsumedFailures"));
this.consumedFailuresPerSecond = registry
.meter(name(baseMetricName, "consumedFailuresPerSecond"));
}
public void markConsumed() {

View File

@ -20,31 +20,36 @@ public class InstrumentationManager {
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry, Map<String, Object> runtime) {
public InstrumentationManager(MetricRegistry metricRegistry,
Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination;
producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key));
producerInstrumentations.putIfAbsent(key,
new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key);
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = "scs-rocketmq.consumer." + destination;
consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key));
consumeInstrumentations.putIfAbsent(key,
new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key);
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = "scs-rocketmq.consumerGroup." + group;
consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key));
consumerGroupsInstrumentations.putIfAbsent(key,
new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key);
}
public Set<Instrumentation> getHealthInstrumentations() {
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
.collect(Collectors.toSet());
}
public void addHealthInstrumentation(Instrumentation instrumentation) {

View File

@ -1,11 +1,11 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -20,9 +20,11 @@ public class ProducerInstrumentation extends Instrumentation {
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
this.totalSentFailures = registry
.counter(name(baseMetricName, "totalSentFailures"));
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
this.sentFailuresPerSecond = registry
.meter(name(baseMetricName, "sentFailuresPerSecond"));
}
public void markSent() {

View File

@ -25,16 +25,19 @@ public class RocketMQExtendedBindingProperties implements
}
@Override
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
} else {
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
} else {
}
else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
@ -44,16 +47,19 @@ public class RocketMQExtendedBindingProperties implements
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
public synchronized RocketMQProducerProperties getExtendedProducerProperties(
String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
} else {
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
} else {
}
else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);

View File

@ -11,8 +11,7 @@ public class RocketMQProducerProperties {
private Boolean enabled = true;
/**
* Maximum allowed message size in bytes
* {@link DefaultMQProducer#maxMessageSize}
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}
*/
private Integer maxMessageSize = 0;

View File

@ -17,17 +17,15 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQTopicProvisioner
implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>> {
public class RocketMQTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
private static final Logger logger = LoggerFactory
.getLogger(RocketMQTopicProvisioner.class);
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties>
properties)
ExtendedProducerProperties<RocketMQProducerProperties> properties)
throws ProvisioningException {
checkTopic(name);
return new RocketProducerDestination(name);
@ -35,8 +33,7 @@ public class RocketMQTopicProvisioner
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
properties)
ExtendedConsumerProperties<RocketMQConsumerProperties> properties)
throws ProvisioningException {
checkTopic(name);
return new RocketConsumerDestination(name);
@ -45,7 +42,8 @@ public class RocketMQTopicProvisioner
private void checkTopic(String topic) {
try {
Validators.checkTopic(topic);
} catch (MQClientException e) {
}
catch (MQClientException e) {
logger.error("topic check error: " + topic, e);
throw new AssertionError(e); // Can't happen
}