mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
format code
Signed-off-by: caotc <250622148@qq.com>
This commit is contained in:
parent
8ed8cd2dba
commit
d02a35b173
@ -82,11 +82,10 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
this.instrumentationManager = instrumentationManager;
|
this.instrumentationManager = instrumentationManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
||||||
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
|
||||||
MessageChannel channel,
|
MessageChannel channel, MessageChannel errorChannel) throws Exception {
|
||||||
MessageChannel errorChannel) throws Exception {
|
|
||||||
if (producerProperties.getExtension().getEnabled()) {
|
if (producerProperties.getExtension().getEnabled()) {
|
||||||
|
|
||||||
// if producerGroup is empty, using destination
|
// if producerGroup is empty, using destination
|
||||||
@ -155,8 +154,6 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||||
rocketMQTemplate, destination.getName(), producerGroup,
|
rocketMQTemplate, destination.getName(), producerGroup,
|
||||||
producerProperties.getExtension().getTransactional(),
|
producerProperties.getExtension().getTransactional(),
|
||||||
@ -167,7 +164,7 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
.findFirst().orElse(null));
|
.findFirst().orElse(null));
|
||||||
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
|
||||||
messageHandler.setSync(producerProperties.getExtension().getSync());
|
messageHandler.setSync(producerProperties.getExtension().getSync());
|
||||||
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
|
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
|
||||||
if (errorChannel != null) {
|
if (errorChannel != null) {
|
||||||
messageHandler.setSendFailureChannel(errorChannel);
|
messageHandler.setSendFailureChannel(errorChannel);
|
||||||
}
|
}
|
||||||
@ -208,7 +205,7 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
|
||||||
listenerContainer
|
listenerContainer
|
||||||
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
||||||
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
|
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
|
||||||
|
|
||||||
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
|
||||||
listenerContainer, consumerProperties, instrumentationManager);
|
listenerContainer, consumerProperties, instrumentationManager);
|
||||||
@ -293,26 +290,27 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
this.extendedBindingProperties = extendedBindingProperties;
|
this.extendedBindingProperties = extendedBindingProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RocketMQHeaderMapper createHeaderMapper(
|
private RocketMQHeaderMapper createHeaderMapper(
|
||||||
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
|
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
|
||||||
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
|
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
|
||||||
.getTrustedPackages();
|
.getTrustedPackages();
|
||||||
return createHeaderMapper(trustedPackages);
|
return createHeaderMapper(trustedPackages);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RocketMQHeaderMapper createHeaderMapper(
|
private RocketMQHeaderMapper createHeaderMapper(
|
||||||
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
|
||||||
return createHeaderMapper(Collections.emptyList());
|
return createHeaderMapper(Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages){
|
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages) {
|
||||||
ObjectMapper objectMapper=this.getApplicationContext()
|
ObjectMapper objectMapper = this.getApplicationContext()
|
||||||
.getBeansOfType(ObjectMapper.class).values().iterator().next();
|
.getBeansOfType(ObjectMapper.class).values().iterator().next();
|
||||||
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper);
|
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
|
||||||
if (!StringUtils.isEmpty(trustedPackages)) {
|
objectMapper);
|
||||||
headerMapper.addTrustedPackages(trustedPackages);
|
if (!StringUtils.isEmpty(trustedPackages)) {
|
||||||
}
|
headerMapper.addTrustedPackages(trustedPackages);
|
||||||
return headerMapper;
|
}
|
||||||
}
|
return headerMapper;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -445,14 +445,15 @@ public class RocketMQListenerBindingContainer
|
|||||||
* @return the converted Spring {@link Message}
|
* @return the converted Spring {@link Message}
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||||
|
|
||||||
// add reconsume-times header to messageExt
|
// add reconsume-times header to messageExt
|
||||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||||
String.valueOf(reconsumeTimes));
|
String.valueOf(reconsumeTimes));
|
||||||
Message message=RocketMQUtil.convertToSpringMessage(messageExt);
|
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||||
return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
return MessageBuilder.fromMessage(message)
|
||||||
|
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
|
|
||||||
private final RocketMQTemplate rocketMQTemplate;
|
private final RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
private RocketMQHeaderMapper headerMapper;
|
private RocketMQHeaderMapper headerMapper;
|
||||||
|
|
||||||
private final Boolean transactional;
|
private final Boolean transactional;
|
||||||
|
|
||||||
@ -156,12 +156,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
//issue 737 fix
|
// issue 737 fix
|
||||||
Map<String,String> jsonHeaders=headerMapper.fromHeaders(message.getHeaders());
|
Map<String, String> jsonHeaders = headerMapper
|
||||||
message = org.springframework.messaging.support.MessageBuilder
|
.fromHeaders(message.getHeaders());
|
||||||
.fromMessage(message).copyHeaders(jsonHeaders)
|
message = org.springframework.messaging.support.MessageBuilder
|
||||||
.build();
|
.fromMessage(message).copyHeaders(jsonHeaders).build();
|
||||||
|
|
||||||
|
|
||||||
final StringBuilder topicWithTags = new StringBuilder(destination);
|
final StringBuilder topicWithTags = new StringBuilder(destination);
|
||||||
String tags = Optional
|
String tags = Optional
|
||||||
@ -210,8 +209,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Message<?> finalMessage = message;
|
Message<?> finalMessage = message;
|
||||||
SendCallback sendCallback = new SendCallback() {
|
SendCallback sendCallback = new SendCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(SendResult sendResult) {
|
public void onSuccess(SendResult sendResult) {
|
||||||
log.debug("async send to topic " + topicWithTags + " "
|
log.debug("async send to topic " + topicWithTags + " "
|
||||||
@ -226,7 +225,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
|
|||||||
getSendFailureChannel().send(
|
getSendFailureChannel().send(
|
||||||
RocketMQMessageHandler.this.errorMessageStrategy
|
RocketMQMessageHandler.this.errorMessageStrategy
|
||||||
.buildErrorMessage(new MessagingException(
|
.buildErrorMessage(new MessagingException(
|
||||||
finalMessage, e), null));
|
finalMessage, e), null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user