diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java index 35663890..03eacbe5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.common.message.MessageQueue; */ public class RocketMQMessageQueueChooser { - private int queueIndex = 0; + private volatile int queueIndex = 0; private volatile List messageQueues; @@ -35,7 +35,7 @@ public class RocketMQMessageQueueChooser { return messageQueues.get(queueIndex); } - public synchronized int requeue() { + public int requeue() { if (queueIndex - 1 < 0) { this.queueIndex = messageQueues.size() - 1; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java index 316a40bf..d7f13be6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; import java.util.List; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; @@ -67,7 +65,7 @@ public class RocketMQMessageSource extends AbstractMessageSource private final String group; - private final Lock lock = new ReentrantLock(); + private final Object consumerMonitor = new Object(); private DefaultMQPullConsumer consumer; @@ -97,7 +95,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public void start() { + public synchronized void start() { if (this.isRunning()) { throw new IllegalStateException( "pull consumer already running. " + this.toString()); @@ -151,7 +149,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public void stop() { + public synchronized void stop() { if (this.isRunning()) { this.setRunning(false); consumer.shutdown(); @@ -159,73 +157,64 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public boolean isRunning() { + public synchronized boolean isRunning() { return running; } @Override - protected Object doReceive() { + protected synchronized Object doReceive() { if (messageQueueChooser.getMessageQueues() == null || messageQueueChooser.getMessageQueues().size() == 0) { return null; } - if (lock.tryLock()) { - try { - int count = 0; - while (count < messageQueueChooser.getMessageQueues().size()) { - - MessageQueue messageQueue = messageQueueChooser.choose(); - - long offset = consumer.fetchConsumeOffset(messageQueue, - rocketMQConsumerProperties.getExtension().isFromStore()); - - log.debug( - "topic='{}', group='{}', messageQueue='{}', offset now='{}'", - this.topic, this.group, messageQueue, offset); - - PullResult pullResult; - if (messageSelector != null) { - pullResult = consumer.pull(messageQueue, messageSelector, offset, - 1); - } - else { - pullResult = consumer.pull(messageQueue, (String) null, offset, - 1); - } - - if (pullResult.getPullStatus() == PullStatus.FOUND) { - List messageExtList = pullResult.getMsgFoundList(); - Message message = RocketMQUtil - .convertToSpringMessage(messageExtList.get(0)); - - AcknowledgmentCallback ackCallback = this.ackCallbackFactory - .createCallback( - new RocketMQAckInfo(messageQueue, pullResult, - consumer, messageQueueChooser, offset)); - - Message messageResult = MessageBuilder.fromMessage(message) - .setHeader( - IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, - ackCallback) - .build(); - messageQueueChooser.increment(); - return messageResult; - } - else { - log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", - messageQueueChooser.getMessageQueues(), - pullResult.getPullStatus(), topic); - } + try { + int count = 0; + while (count < messageQueueChooser.getMessageQueues().size()) { + MessageQueue messageQueue; + synchronized (this.consumerMonitor) { + messageQueue = messageQueueChooser.choose(); messageQueueChooser.increment(); - count++; } + + long offset = consumer.fetchConsumeOffset(messageQueue, + rocketMQConsumerProperties.getExtension().isFromStore()); + + log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'", + this.topic, this.group, messageQueue, offset); + + PullResult pullResult; + if (messageSelector != null) { + pullResult = consumer.pull(messageQueue, messageSelector, offset, 1); + } + else { + pullResult = consumer.pull(messageQueue, (String) null, offset, 1); + } + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExtList = pullResult.getMsgFoundList(); + + Message message = RocketMQUtil + .convertToSpringMessage(messageExtList.get(0)); + + AcknowledgmentCallback ackCallback = this.ackCallbackFactory + .createCallback(new RocketMQAckInfo(messageQueue, pullResult, + consumer, offset)); + + Message messageResult = MessageBuilder.fromMessage(message).setHeader( + IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + ackCallback).build(); + return messageResult; + } + else { + log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", + messageQueueChooser.getMessageQueues(), + pullResult.getPullStatus(), topic); + } + count++; } - catch (Exception e) { - log.error("Consumer pull error: " + e.getMessage(), e); - } - finally { - lock.unlock(); - } + } + catch (Exception e) { + log.error("Consumer pull error: " + e.getMessage(), e); } return null; } @@ -235,20 +224,15 @@ public class RocketMQMessageSource extends AbstractMessageSource return "rocketmq:message-source"; } - public void setRunning(boolean running) { + public synchronized void setRunning(boolean running) { this.running = running; } - public void resetMessageQueues(Set queueSet) { - lock.lock(); - try { - log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, - queueSet); + public synchronized void resetMessageQueues(Set queueSet) { + log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet); + synchronized (this.consumerMonitor) { this.messageQueueChooser.reset(queueSet); } - finally { - lock.unlock(); - } } public static class RocketMQCallbackFactory @@ -295,35 +279,42 @@ public class RocketMQMessageSource extends AbstractMessageSource @Override public void acknowledge(Status status) { Assert.notNull(status, "'status' cannot be null"); + if (this.acknowledged) { + throw new IllegalStateException("Already acknowledged"); + } log.debug("acknowledge(" + status.name() + ") for " + this); - try { - switch (status) { - case ACCEPT: - case REJECT: - ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), - ackInfo.getPullResult().getNextBeginOffset()); - log.debug("messageQueue='{}' offset update to `{}`", - ackInfo.getMessageQueue(), - String.valueOf(ackInfo.getPullResult().getNextBeginOffset())); - break; - case REQUEUE: - // decrease index and update offset of messageQueue of ackInfo - int oldIndex = ackInfo.getMessageQueueChooser().requeue(); - ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), - ackInfo.getOldOffset()); - log.debug( - "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", - ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset()); - break; - default: - break; + synchronized (this.ackInfo.getConsumerMonitor()) { + try { + switch (status) { + case ACCEPT: + case REJECT: + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), + ackInfo.getPullResult().getNextBeginOffset()); + log.debug("messageQueue='{}' offset update to `{}`", + ackInfo.getMessageQueue(), String.valueOf( + ackInfo.getPullResult().getNextBeginOffset())); + break; + case REQUEUE: + // decrease index and update offset of messageQueue of ackInfo + int oldIndex = ackInfo.getMessageQueueChooser().requeue(); + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), ackInfo.getOldOffset()); + log.debug( + "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", + ackInfo.getMessageQueue(), oldIndex, + ackInfo.getOldOffset()); + break; + default: + break; + } + } + catch (MQClientException e) { + log.error("acknowledge error: " + e.getErrorMessage(), e); + } + finally { + this.acknowledged = true; } - } - catch (MQClientException e) { - log.error("acknowledge error: " + e.getErrorMessage(), e); - } - finally { - this.acknowledged = true; } } @@ -334,7 +325,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } } - public static class RocketMQAckInfo { + public class RocketMQAckInfo { private final MessageQueue messageQueue; @@ -342,17 +333,13 @@ public class RocketMQMessageSource extends AbstractMessageSource private final DefaultMQPullConsumer consumer; - private final RocketMQMessageQueueChooser messageQueueChooser; - private final long oldOffset; public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, - DefaultMQPullConsumer consumer, - RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) { + DefaultMQPullConsumer consumer, long oldOffset) { this.messageQueue = messageQueue; this.pullResult = pullResult; this.consumer = consumer; - this.messageQueueChooser = messageQueueChooser; this.oldOffset = oldOffset; } @@ -369,18 +356,22 @@ public class RocketMQMessageSource extends AbstractMessageSource } public RocketMQMessageQueueChooser getMessageQueueChooser() { - return messageQueueChooser; + return RocketMQMessageSource.this.messageQueueChooser; } public long getOldOffset() { return oldOffset; } + public Object getConsumerMonitor() { + return RocketMQMessageSource.this.consumerMonitor; + } + @Override public String toString() { return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult=" - + pullResult + ", consumer=" + consumer + ", messageQueueChooser=" - + messageQueueChooser + ", oldOffset=" + oldOffset + '}'; + + pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset + + '}'; } }