diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java index 25204919..e7a2385f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -1,10 +1,15 @@ package org.springframework.cloud.alibaba.cloud.examples; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.binder.PollableMessageSource; +import org.springframework.context.annotation.Bean; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.SubscribableChannel; /** @@ -27,10 +32,36 @@ public class RocketMQConsumerApplication { @Input("input4") SubscribableChannel input4(); + + @Input("input5") + PollableMessageSource input5(); } public static void main(String[] args) { SpringApplication.run(RocketMQConsumerApplication.class, args); } + @Bean + public ConsumerCustomRunner customRunner() { + return new ConsumerCustomRunner(); + } + + public static class ConsumerCustomRunner implements CommandLineRunner { + + @Autowired + private MySink mySink; + + @Override + public void run(String... args) throws InterruptedException { + while (true) { + mySink.input5().poll(m -> { + String payload = (String) m.getPayload(); + System.out.println("pull msg: " + payload); + }, new ParameterizedTypeReference() { + }); + Thread.sleep(2_000); + } + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index ec27539c..dd8bb6ef 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -24,6 +24,10 @@ spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group spring.cloud.stream.bindings.input4.consumer.concurrency=5 +spring.cloud.stream.bindings.input5.destination=pull-topic +spring.cloud.stream.bindings.input5.content-type=text/plain +spring.cloud.stream.bindings.input5.group=pull-topic-group + spring.application.name=rocketmq-consume-example server.port=28082 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index bd157be0..9a55a4a4 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -9,6 +9,7 @@ import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; /** * @author Jim @@ -23,6 +24,9 @@ public class RocketMQProduceApplication { @Output("output2") MessageChannel output2(); + + @Output("output3") + MessageChannel output3(); } public static void main(String[] args) { @@ -31,7 +35,12 @@ public class RocketMQProduceApplication { @Bean public CustomRunner customRunner() { - return new CustomRunner(); + return new CustomRunner("output1"); + } + + @Bean + public CustomRunner customRunner2() { + return new CustomRunner("output3"); } @Bean @@ -40,24 +49,45 @@ public class RocketMQProduceApplication { } public static class CustomRunner implements CommandLineRunner { + + private final String bindingName; + + public CustomRunner(String bindingName) { + this.bindingName = bindingName; + } + @Autowired private SenderService senderService; + @Autowired + private MySource mySource; + @Override public void run(String... args) throws Exception { - int count = 5; - for (int index = 1; index <= count; index++) { - String msgContent = "msg-" + index; - if (index % 3 == 0) { - senderService.send(msgContent); - } - else if (index % 3 == 1) { - senderService.sendWithTags(msgContent, "tagStr"); - } - else { - senderService.sendObject(new Foo(index, "foo"), "tagObj"); + if (this.bindingName.equals("output1")) { + int count = 5; + for (int index = 1; index <= count; index++) { + String msgContent = "msg-" + index; + if (index % 3 == 0) { + senderService.send(msgContent); + } + else if (index % 3 == 1) { + senderService.sendWithTags(msgContent, "tagStr"); + } + else { + senderService.sendObject(new Foo(index, "foo"), "tagObj"); + } } } + else if (this.bindingName.equals("output3")) { + int count = 50; + for (int index = 1; index <= count; index++) { + String msgContent = "pullMsg-" + index; + mySource.output3() + .send(MessageBuilder.withPayload(msgContent).build()); + } + } + } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties index beca964a..a77b3084 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -12,6 +12,10 @@ spring.cloud.stream.bindings.output2.content-type=application/json spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup +spring.cloud.stream.bindings.output3.destination=pull-topic +spring.cloud.stream.bindings.output3.content-type=text/plain +spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group + spring.application.name=rocketmq-produce-example server.port=28081 diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 38776db5..30eb3760 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -34,6 +34,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -42,9 +43,13 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProdu import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallback.Status; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -83,7 +88,7 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - // if producerGroup is empty, using destination + // if producerGroup is empty, using destination String extendedProducerGroup = producerProperties.getExtension().getGroup(); String producerGroup = StringUtils.isEmpty(extendedProducerGroup) ? destination.getName() @@ -206,6 +211,39 @@ public class RocketMQMessageChannelBinder extends return rocketInboundChannelAdapter; } + @Override + protected PolledConsumerResources createPolledConsumerResources(String name, + String group, ConsumerDestination destination, + ExtendedConsumerProperties consumerProperties) { + RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource( + rocketBinderConfigurationProperties, consumerProperties, name, group); + return new PolledConsumerResources(rocketMQMessageSource, + registerErrorInfrastructure(destination, group, consumerProperties, + true)); + } + + @Override + protected MessageHandler getPolledConsumerErrorMessageHandler( + ConsumerDestination destination, String group, + ExtendedConsumerProperties properties) { + return message -> { + if (message.getPayload() instanceof MessagingException) { + AcknowledgmentCallback ack = StaticMessageHeaderAccessor + .getAcknowledgmentCallback( + ((MessagingException) message.getPayload()) + .getFailedMessage()); + if (ack != null) { + if (properties.getExtension().shouldRequeue()) { + ack.acknowledge(Status.REQUEUE); + } + else { + ack.acknowledge(Status.REJECT); + } + } + } + }; + } + @Override public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java new file mode 100644 index 00000000..03eacbe5 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * @author Jim + */ +public class RocketMQMessageQueueChooser { + + private volatile int queueIndex = 0; + + private volatile List messageQueues; + + public MessageQueue choose() { + return messageQueues.get(queueIndex); + } + + public int requeue() { + if (queueIndex - 1 < 0) { + this.queueIndex = messageQueues.size() - 1; + } + else { + this.queueIndex = this.queueIndex - 1; + } + return this.queueIndex; + } + + public void increment() { + this.queueIndex = (this.queueIndex + 1) % messageQueues.size(); + } + + public void reset(Set queueSet) { + this.messageQueues = null; + this.messageQueues = new ArrayList<>(queueSet); + this.queueIndex = 0; + } + + public List getMessageQueues() { + return messageQueues; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java new file mode 100644 index 00000000..d7f13be6 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -0,0 +1,378 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.util.List; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallbackFactory; +import org.springframework.integration.endpoint.AbstractMessageSource; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQMessageSource extends AbstractMessageSource + implements DisposableBean, Lifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageSource.class); + + private final RocketMQCallbackFactory ackCallbackFactory; + + private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties; + + private final ExtendedConsumerProperties rocketMQConsumerProperties; + + private final String topic; + + private final String group; + + private final Object consumerMonitor = new Object(); + + private DefaultMQPullConsumer consumer; + + private boolean running; + + private MessageSelector messageSelector; + + private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser(); + + public RocketMQMessageSource( + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties, + rocketMQConsumerProperties, topic, group); + } + + public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory, + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this.ackCallbackFactory = ackCallbackFactory; + this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties; + this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.topic = topic; + this.group = group; + } + + @Override + public synchronized void start() { + if (this.isRunning()) { + throw new IllegalStateException( + "pull consumer already running. " + this.toString()); + } + try { + consumer = new DefaultMQPullConsumer(group); + consumer.setNamesrvAddr( + rocketMQBinderConfigurationProperties.getNameServer()); + consumer.setConsumerPullTimeoutMillis( + rocketMQConsumerProperties.getExtension().getPullTimeout()); + consumer.setMessageModel(MessageModel.CLUSTERING); + + String tags = rocketMQConsumerProperties.getExtension().getTags(); + String sql = rocketMQConsumerProperties.getExtension().getSql(); + + if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(tags)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.bySql(sql); + } + + consumer.registerMessageQueueListener(topic, new MessageQueueListener() { + @Override + public void messageQueueChanged(String topic, Set mqAll, + Set mqDivided) { + log.info( + "messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`", + topic, mqAll, mqDivided); + switch (consumer.getMessageModel()) { + case BROADCASTING: + RocketMQMessageSource.this.resetMessageQueues(mqAll); + break; + case CLUSTERING: + RocketMQMessageSource.this.resetMessageQueues(mqDivided); + break; + default: + break; + } + } + }); + consumer.start(); + } + catch (MQClientException e) { + log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e); + } + this.setRunning(true); + } + + @Override + public synchronized void stop() { + if (this.isRunning()) { + this.setRunning(false); + consumer.shutdown(); + } + } + + @Override + public synchronized boolean isRunning() { + return running; + } + + @Override + protected synchronized Object doReceive() { + if (messageQueueChooser.getMessageQueues() == null + || messageQueueChooser.getMessageQueues().size() == 0) { + return null; + } + try { + int count = 0; + while (count < messageQueueChooser.getMessageQueues().size()) { + MessageQueue messageQueue; + synchronized (this.consumerMonitor) { + messageQueue = messageQueueChooser.choose(); + messageQueueChooser.increment(); + } + + long offset = consumer.fetchConsumeOffset(messageQueue, + rocketMQConsumerProperties.getExtension().isFromStore()); + + log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'", + this.topic, this.group, messageQueue, offset); + + PullResult pullResult; + if (messageSelector != null) { + pullResult = consumer.pull(messageQueue, messageSelector, offset, 1); + } + else { + pullResult = consumer.pull(messageQueue, (String) null, offset, 1); + } + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExtList = pullResult.getMsgFoundList(); + + Message message = RocketMQUtil + .convertToSpringMessage(messageExtList.get(0)); + + AcknowledgmentCallback ackCallback = this.ackCallbackFactory + .createCallback(new RocketMQAckInfo(messageQueue, pullResult, + consumer, offset)); + + Message messageResult = MessageBuilder.fromMessage(message).setHeader( + IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + ackCallback).build(); + return messageResult; + } + else { + log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", + messageQueueChooser.getMessageQueues(), + pullResult.getPullStatus(), topic); + } + count++; + } + } + catch (Exception e) { + log.error("Consumer pull error: " + e.getMessage(), e); + } + return null; + } + + @Override + public String getComponentType() { + return "rocketmq:message-source"; + } + + public synchronized void setRunning(boolean running) { + this.running = running; + } + + public synchronized void resetMessageQueues(Set queueSet) { + log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet); + synchronized (this.consumerMonitor) { + this.messageQueueChooser.reset(queueSet); + } + } + + public static class RocketMQCallbackFactory + implements AcknowledgmentCallbackFactory { + + @Override + public AcknowledgmentCallback createCallback(RocketMQAckInfo info) { + return new RocketMQAckCallback(info); + } + + } + + public static class RocketMQAckCallback implements AcknowledgmentCallback { + + private final RocketMQAckInfo ackInfo; + + private boolean acknowledged; + + private boolean autoAckEnabled = true; + + public RocketMQAckCallback(RocketMQAckInfo ackInfo) { + this.ackInfo = ackInfo; + } + + protected void setAcknowledged(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + @Override + public boolean isAcknowledged() { + return this.acknowledged; + } + + @Override + public void noAutoAck() { + this.autoAckEnabled = false; + } + + @Override + public boolean isAutoAck() { + return this.autoAckEnabled; + } + + @Override + public void acknowledge(Status status) { + Assert.notNull(status, "'status' cannot be null"); + if (this.acknowledged) { + throw new IllegalStateException("Already acknowledged"); + } + log.debug("acknowledge(" + status.name() + ") for " + this); + synchronized (this.ackInfo.getConsumerMonitor()) { + try { + switch (status) { + case ACCEPT: + case REJECT: + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), + ackInfo.getPullResult().getNextBeginOffset()); + log.debug("messageQueue='{}' offset update to `{}`", + ackInfo.getMessageQueue(), String.valueOf( + ackInfo.getPullResult().getNextBeginOffset())); + break; + case REQUEUE: + // decrease index and update offset of messageQueue of ackInfo + int oldIndex = ackInfo.getMessageQueueChooser().requeue(); + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), ackInfo.getOldOffset()); + log.debug( + "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", + ackInfo.getMessageQueue(), oldIndex, + ackInfo.getOldOffset()); + break; + default: + break; + } + } + catch (MQClientException e) { + log.error("acknowledge error: " + e.getErrorMessage(), e); + } + finally { + this.acknowledged = true; + } + } + } + + @Override + public String toString() { + return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged=" + + acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}'; + } + } + + public class RocketMQAckInfo { + + private final MessageQueue messageQueue; + + private final PullResult pullResult; + + private final DefaultMQPullConsumer consumer; + + private final long oldOffset; + + public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, + DefaultMQPullConsumer consumer, long oldOffset) { + this.messageQueue = messageQueue; + this.pullResult = pullResult; + this.consumer = consumer; + this.oldOffset = oldOffset; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public PullResult getPullResult() { + return pullResult; + } + + public DefaultMQPullConsumer getConsumer() { + return consumer; + } + + public RocketMQMessageQueueChooser getMessageQueueChooser() { + return RocketMQMessageSource.this.messageQueueChooser; + } + + public long getOldOffset() { + return oldOffset; + } + + public Object getConsumerMonitor() { + return RocketMQMessageSource.this.consumerMonitor; + } + + @Override + public String toString() { + return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult=" + + pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset + + '}'; + } + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 6cfe9b84..343c9056 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -51,7 +52,9 @@ public class RocketMQConsumerProperties { private Boolean orderly = false; /** - * for concurrently listener. message consume retry strategy + * for concurrently listener. message consume retry strategy. see + * {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or + * discard, see {@link this#shouldRequeue}), others means requeue */ private int delayLevelWhenNextConsume = 0; @@ -62,6 +65,14 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + // ------------ For Pull Consumer ------------ + + private long pullTimeout = 10 * 1000; + + private boolean fromStore; + + // ------------ For Pull Consumer ------------ + public String getTags() { return tags; } @@ -117,4 +128,24 @@ public class RocketMQConsumerProperties { public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } + + public long getPullTimeout() { + return pullTimeout; + } + + public void setPullTimeout(long pullTimeout) { + this.pullTimeout = pullTimeout; + } + + public boolean isFromStore() { + return fromStore; + } + + public void setFromStore(boolean fromStore) { + this.fromStore = fromStore; + } + + public boolean shouldRequeue() { + return delayLevelWhenNextConsume != -1; + } }