1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

Polish #541, enhance concurrent handle

This commit is contained in:
fangjian0423 2019-04-16 10:35:32 +08:00
parent ad944513b7
commit d37f21885b
2 changed files with 99 additions and 108 deletions

View File

@ -27,7 +27,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
*/
public class RocketMQMessageQueueChooser {
private int queueIndex = 0;
private volatile int queueIndex = 0;
private volatile List<MessageQueue> messageQueues;
@ -35,7 +35,7 @@ public class RocketMQMessageQueueChooser {
return messageQueues.get(queueIndex);
}
public synchronized int requeue() {
public int requeue() {
if (queueIndex - 1 < 0) {
this.queueIndex = messageQueues.size() - 1;
}

View File

@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
@ -67,7 +65,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
private final String group;
private final Lock lock = new ReentrantLock();
private final Object consumerMonitor = new Object();
private DefaultMQPullConsumer consumer;
@ -97,7 +95,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
}
@Override
public void start() {
public synchronized void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
@ -151,7 +149,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
}
@Override
public void stop() {
public synchronized void stop() {
if (this.isRunning()) {
this.setRunning(false);
consumer.shutdown();
@ -159,56 +157,52 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
}
@Override
public boolean isRunning() {
public synchronized boolean isRunning() {
return running;
}
@Override
protected Object doReceive() {
protected synchronized Object doReceive() {
if (messageQueueChooser.getMessageQueues() == null
|| messageQueueChooser.getMessageQueues().size() == 0) {
return null;
}
if (lock.tryLock()) {
try {
int count = 0;
while (count < messageQueueChooser.getMessageQueues().size()) {
MessageQueue messageQueue = messageQueueChooser.choose();
MessageQueue messageQueue;
synchronized (this.consumerMonitor) {
messageQueue = messageQueueChooser.choose();
messageQueueChooser.increment();
}
long offset = consumer.fetchConsumeOffset(messageQueue,
rocketMQConsumerProperties.getExtension().isFromStore());
log.debug(
"topic='{}', group='{}', messageQueue='{}', offset now='{}'",
log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'",
this.topic, this.group, messageQueue, offset);
PullResult pullResult;
if (messageSelector != null) {
pullResult = consumer.pull(messageQueue, messageSelector, offset,
1);
pullResult = consumer.pull(messageQueue, messageSelector, offset, 1);
}
else {
pullResult = consumer.pull(messageQueue, (String) null, offset,
1);
pullResult = consumer.pull(messageQueue, (String) null, offset, 1);
}
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
Message message = RocketMQUtil
.convertToSpringMessage(messageExtList.get(0));
AcknowledgmentCallback ackCallback = this.ackCallbackFactory
.createCallback(
new RocketMQAckInfo(messageQueue, pullResult,
consumer, messageQueueChooser, offset));
.createCallback(new RocketMQAckInfo(messageQueue, pullResult,
consumer, offset));
Message messageResult = MessageBuilder.fromMessage(message)
.setHeader(
Message messageResult = MessageBuilder.fromMessage(message).setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
ackCallback)
.build();
messageQueueChooser.increment();
ackCallback).build();
return messageResult;
}
else {
@ -216,17 +210,12 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
messageQueueChooser.getMessageQueues(),
pullResult.getPullStatus(), topic);
}
messageQueueChooser.increment();
count++;
}
}
catch (Exception e) {
log.error("Consumer pull error: " + e.getMessage(), e);
}
finally {
lock.unlock();
}
}
return null;
}
@ -235,20 +224,15 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
return "rocketmq:message-source";
}
public void setRunning(boolean running) {
public synchronized void setRunning(boolean running) {
this.running = running;
}
public void resetMessageQueues(Set<MessageQueue> queueSet) {
lock.lock();
try {
log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic,
queueSet);
public synchronized void resetMessageQueues(Set<MessageQueue> queueSet) {
log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet);
synchronized (this.consumerMonitor) {
this.messageQueueChooser.reset(queueSet);
}
finally {
lock.unlock();
}
}
public static class RocketMQCallbackFactory
@ -295,25 +279,31 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
@Override
public void acknowledge(Status status) {
Assert.notNull(status, "'status' cannot be null");
if (this.acknowledged) {
throw new IllegalStateException("Already acknowledged");
}
log.debug("acknowledge(" + status.name() + ") for " + this);
synchronized (this.ackInfo.getConsumerMonitor()) {
try {
switch (status) {
case ACCEPT:
case REJECT:
ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(),
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(),
ackInfo.getPullResult().getNextBeginOffset());
log.debug("messageQueue='{}' offset update to `{}`",
ackInfo.getMessageQueue(),
String.valueOf(ackInfo.getPullResult().getNextBeginOffset()));
ackInfo.getMessageQueue(), String.valueOf(
ackInfo.getPullResult().getNextBeginOffset()));
break;
case REQUEUE:
// decrease index and update offset of messageQueue of ackInfo
int oldIndex = ackInfo.getMessageQueueChooser().requeue();
ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(),
ackInfo.getOldOffset());
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(), ackInfo.getOldOffset());
log.debug(
"messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'",
ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset());
ackInfo.getMessageQueue(), oldIndex,
ackInfo.getOldOffset());
break;
default:
break;
@ -326,6 +316,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.acknowledged = true;
}
}
}
@Override
public String toString() {
@ -334,7 +325,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
}
}
public static class RocketMQAckInfo {
public class RocketMQAckInfo {
private final MessageQueue messageQueue;
@ -342,17 +333,13 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
private final DefaultMQPullConsumer consumer;
private final RocketMQMessageQueueChooser messageQueueChooser;
private final long oldOffset;
public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult,
DefaultMQPullConsumer consumer,
RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) {
DefaultMQPullConsumer consumer, long oldOffset) {
this.messageQueue = messageQueue;
this.pullResult = pullResult;
this.consumer = consumer;
this.messageQueueChooser = messageQueueChooser;
this.oldOffset = oldOffset;
}
@ -369,18 +356,22 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
}
public RocketMQMessageQueueChooser getMessageQueueChooser() {
return messageQueueChooser;
return RocketMQMessageSource.this.messageQueueChooser;
}
public long getOldOffset() {
return oldOffset;
}
public Object getConsumerMonitor() {
return RocketMQMessageSource.this.consumerMonitor;
}
@Override
public String toString() {
return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult="
+ pullResult + ", consumer=" + consumer + ", messageQueueChooser="
+ messageQueueChooser + ", oldOffset=" + oldOffset + '}';
+ pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset
+ '}';
}
}