1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Code refactoring and some new feature support - delete some invalid files.

This commit is contained in:
zkzlx
2021-03-22 11:05:31 +08:00
parent 09841f6616
commit 6d7c47a366
9 changed files with 61 additions and 55 deletions

View File

@@ -18,18 +18,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.contants;
import org.apache.rocketmq.common.message.MessageConst;
import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
/**
* @author zkzlx
*/
public class RocketMQConst extends MessageConst {
/**
* Header key for RocketMQ Transactional Args.
*/
public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
/**
* Default NameServer value.
*/
@@ -38,12 +31,8 @@ public class RocketMQConst extends MessageConst {
/**
* Default group for SCS RocketMQ Binder.
*/
public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
public static final String DEFAULT_GROUP = "binder_default_group_name";
/**
* RocketMQ re-consume times.
*/
public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";

View File

@@ -17,6 +17,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
@@ -62,6 +63,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
private final MessageSelector messageSelector;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
private volatile Iterator<MessageExt> messageExtIterator=null;
public RocketMQMessageSource(String name,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = name;
@@ -82,7 +85,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.consumer = RocketMQConsumerFactory
.initPullConsumer(extendedConsumerProperties);
// This parameter must be 1, otherwise doReceive cannot be handled singly.
this.consumer.setPullBatchSize(1);
// this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
this.consumer.setAutoCommit(false);
this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer);
@@ -132,11 +135,20 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
@Override
protected synchronized Object doReceive() {
List<MessageExt> messageExtList = consumer.poll();
if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
if(messageExtIterator == null){
List<MessageExt> messageExtList = consumer.poll();
if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
return null;
}
messageExtIterator = messageExtList.iterator();
}
MessageExt messageExt=messageExtIterator.next();
if(!messageExtIterator.hasNext()){
messageExtIterator = null;
}
if(null == messageExt){
return null;
}
MessageExt messageExt = messageExtList.get(0);
MessageQueue messageQueue = null;
for (MessageQueue queue : assignedMessageQueue.getAssignedMessageQueues()) {
if (queue.getQueueId() == messageExt.getQueueId()) {
@@ -144,8 +156,11 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
break;
}
}
if(messageQueue == null){
throw new IllegalArgumentException("The message queue is not in assigned list");
}
Message message = RocketMQMessageConverterSupport
.convertMessage2Spring(messageExtList.get(0));
.convertMessage2Spring(messageExt);
return MessageBuilder.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
new RocketMQAckCallback(this.consumer, assignedMessageQueue,

View File

@@ -160,7 +160,7 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
TransactionListener.class);
if (transactionListener == null) {
throw new MessagingException(
"TransactionMQProducer must have a TransactionMQProducer !!! ");
"TransactionMQProducer must have a TransactionListener !!! ");
}
((TransactionMQProducer) defaultMQProducer)
.setTransactionListener(transactionListener);

View File

@@ -16,6 +16,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.utils;
import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQCommonProperties;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -81,7 +82,7 @@ public class RocketMQUtils {
public static String getNameServerStr(String nameServer) {
if (StringUtils.isEmpty(nameServer)) {
return null;
return RocketMQConst.DEFAULT_NAME_SERVER;
}
return nameServer.replaceAll(",", ";");
}