From eabbc53b4bf0e4067a77fe0082fc7b69cd018093 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 11 Apr 2019 21:31:06 +0800 Subject: [PATCH 1/5] Polish #541 --- .../RocketMQMessageQueueChooser.java | 61 +++ .../integration/RocketMQMessageSource.java | 387 ++++++++++++++++++ .../RocketMQConsumerProperties.java | 24 ++ 3 files changed, 472 insertions(+) create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java new file mode 100644 index 00000000..35663890 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * @author Jim + */ +public class RocketMQMessageQueueChooser { + + private int queueIndex = 0; + + private volatile List messageQueues; + + public MessageQueue choose() { + return messageQueues.get(queueIndex); + } + + public synchronized int requeue() { + if (queueIndex - 1 < 0) { + this.queueIndex = messageQueues.size() - 1; + } + else { + this.queueIndex = this.queueIndex - 1; + } + return this.queueIndex; + } + + public void increment() { + this.queueIndex = (this.queueIndex + 1) % messageQueues.size(); + } + + public void reset(Set queueSet) { + this.messageQueues = null; + this.messageQueues = new ArrayList<>(queueSet); + this.queueIndex = 0; + } + + public List getMessageQueues() { + return messageQueues; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java new file mode 100644 index 00000000..316a40bf --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallbackFactory; +import org.springframework.integration.endpoint.AbstractMessageSource; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQMessageSource extends AbstractMessageSource + implements DisposableBean, Lifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageSource.class); + + private final RocketMQCallbackFactory ackCallbackFactory; + + private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties; + + private final ExtendedConsumerProperties rocketMQConsumerProperties; + + private final String topic; + + private final String group; + + private final Lock lock = new ReentrantLock(); + + private DefaultMQPullConsumer consumer; + + private boolean running; + + private MessageSelector messageSelector; + + private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser(); + + public RocketMQMessageSource( + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties, + rocketMQConsumerProperties, topic, group); + } + + public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory, + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this.ackCallbackFactory = ackCallbackFactory; + this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties; + this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.topic = topic; + this.group = group; + } + + @Override + public void start() { + if (this.isRunning()) { + throw new IllegalStateException( + "pull consumer already running. " + this.toString()); + } + try { + consumer = new DefaultMQPullConsumer(group); + consumer.setNamesrvAddr( + rocketMQBinderConfigurationProperties.getNameServer()); + consumer.setConsumerPullTimeoutMillis( + rocketMQConsumerProperties.getExtension().getPullTimeout()); + consumer.setMessageModel(MessageModel.CLUSTERING); + + String tags = rocketMQConsumerProperties.getExtension().getTags(); + String sql = rocketMQConsumerProperties.getExtension().getSql(); + + if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(tags)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.bySql(sql); + } + + consumer.registerMessageQueueListener(topic, new MessageQueueListener() { + @Override + public void messageQueueChanged(String topic, Set mqAll, + Set mqDivided) { + log.info( + "messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`", + topic, mqAll, mqDivided); + switch (consumer.getMessageModel()) { + case BROADCASTING: + RocketMQMessageSource.this.resetMessageQueues(mqAll); + break; + case CLUSTERING: + RocketMQMessageSource.this.resetMessageQueues(mqDivided); + break; + default: + break; + } + } + }); + consumer.start(); + } + catch (MQClientException e) { + log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e); + } + this.setRunning(true); + } + + @Override + public void stop() { + if (this.isRunning()) { + this.setRunning(false); + consumer.shutdown(); + } + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + protected Object doReceive() { + if (messageQueueChooser.getMessageQueues() == null + || messageQueueChooser.getMessageQueues().size() == 0) { + return null; + } + if (lock.tryLock()) { + try { + int count = 0; + while (count < messageQueueChooser.getMessageQueues().size()) { + + MessageQueue messageQueue = messageQueueChooser.choose(); + + long offset = consumer.fetchConsumeOffset(messageQueue, + rocketMQConsumerProperties.getExtension().isFromStore()); + + log.debug( + "topic='{}', group='{}', messageQueue='{}', offset now='{}'", + this.topic, this.group, messageQueue, offset); + + PullResult pullResult; + if (messageSelector != null) { + pullResult = consumer.pull(messageQueue, messageSelector, offset, + 1); + } + else { + pullResult = consumer.pull(messageQueue, (String) null, offset, + 1); + } + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExtList = pullResult.getMsgFoundList(); + Message message = RocketMQUtil + .convertToSpringMessage(messageExtList.get(0)); + + AcknowledgmentCallback ackCallback = this.ackCallbackFactory + .createCallback( + new RocketMQAckInfo(messageQueue, pullResult, + consumer, messageQueueChooser, offset)); + + Message messageResult = MessageBuilder.fromMessage(message) + .setHeader( + IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + ackCallback) + .build(); + messageQueueChooser.increment(); + return messageResult; + } + else { + log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", + messageQueueChooser.getMessageQueues(), + pullResult.getPullStatus(), topic); + } + messageQueueChooser.increment(); + count++; + } + } + catch (Exception e) { + log.error("Consumer pull error: " + e.getMessage(), e); + } + finally { + lock.unlock(); + } + } + return null; + } + + @Override + public String getComponentType() { + return "rocketmq:message-source"; + } + + public void setRunning(boolean running) { + this.running = running; + } + + public void resetMessageQueues(Set queueSet) { + lock.lock(); + try { + log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, + queueSet); + this.messageQueueChooser.reset(queueSet); + } + finally { + lock.unlock(); + } + } + + public static class RocketMQCallbackFactory + implements AcknowledgmentCallbackFactory { + + @Override + public AcknowledgmentCallback createCallback(RocketMQAckInfo info) { + return new RocketMQAckCallback(info); + } + + } + + public static class RocketMQAckCallback implements AcknowledgmentCallback { + + private final RocketMQAckInfo ackInfo; + + private boolean acknowledged; + + private boolean autoAckEnabled = true; + + public RocketMQAckCallback(RocketMQAckInfo ackInfo) { + this.ackInfo = ackInfo; + } + + protected void setAcknowledged(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + @Override + public boolean isAcknowledged() { + return this.acknowledged; + } + + @Override + public void noAutoAck() { + this.autoAckEnabled = false; + } + + @Override + public boolean isAutoAck() { + return this.autoAckEnabled; + } + + @Override + public void acknowledge(Status status) { + Assert.notNull(status, "'status' cannot be null"); + log.debug("acknowledge(" + status.name() + ") for " + this); + try { + switch (status) { + case ACCEPT: + case REJECT: + ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), + ackInfo.getPullResult().getNextBeginOffset()); + log.debug("messageQueue='{}' offset update to `{}`", + ackInfo.getMessageQueue(), + String.valueOf(ackInfo.getPullResult().getNextBeginOffset())); + break; + case REQUEUE: + // decrease index and update offset of messageQueue of ackInfo + int oldIndex = ackInfo.getMessageQueueChooser().requeue(); + ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), + ackInfo.getOldOffset()); + log.debug( + "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", + ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset()); + break; + default: + break; + } + } + catch (MQClientException e) { + log.error("acknowledge error: " + e.getErrorMessage(), e); + } + finally { + this.acknowledged = true; + } + } + + @Override + public String toString() { + return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged=" + + acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}'; + } + } + + public static class RocketMQAckInfo { + + private final MessageQueue messageQueue; + + private final PullResult pullResult; + + private final DefaultMQPullConsumer consumer; + + private final RocketMQMessageQueueChooser messageQueueChooser; + + private final long oldOffset; + + public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, + DefaultMQPullConsumer consumer, + RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) { + this.messageQueue = messageQueue; + this.pullResult = pullResult; + this.consumer = consumer; + this.messageQueueChooser = messageQueueChooser; + this.oldOffset = oldOffset; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public PullResult getPullResult() { + return pullResult; + } + + public DefaultMQPullConsumer getConsumer() { + return consumer; + } + + public RocketMQMessageQueueChooser getMessageQueueChooser() { + return messageQueueChooser; + } + + public long getOldOffset() { + return oldOffset; + } + + @Override + public String toString() { + return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult=" + + pullResult + ", consumer=" + consumer + ", messageQueueChooser=" + + messageQueueChooser + ", oldOffset=" + oldOffset + '}'; + } + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 6cfe9b84..00b41a03 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -62,6 +62,14 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + // ------------ For Pull Consumer ------------ + + private long pullTimeout = 10 * 1000; + + private boolean fromStore; + + // ------------ For Pull Consumer ------------ + public String getTags() { return tags; } @@ -117,4 +125,20 @@ public class RocketMQConsumerProperties { public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } + + public long getPullTimeout() { + return pullTimeout; + } + + public void setPullTimeout(long pullTimeout) { + this.pullTimeout = pullTimeout; + } + + public boolean isFromStore() { + return fromStore; + } + + public void setFromStore(boolean fromStore) { + this.fromStore = fromStore; + } } From 13ec109e9d771702b442de9f3d3fe26cf7c72dae Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Fri, 12 Apr 2019 10:41:25 +0800 Subject: [PATCH 2/5] Polish #541, implement createPolledConsumerResources method of binder --- .../rocketmq/RocketMQMessageChannelBinder.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 38776db5..088ee58c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -34,6 +34,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -83,7 +84,7 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - // if producerGroup is empty, using destination + // if producerGroup is empty, using destination String extendedProducerGroup = producerProperties.getExtension().getGroup(); String producerGroup = StringUtils.isEmpty(extendedProducerGroup) ? destination.getName() @@ -206,6 +207,17 @@ public class RocketMQMessageChannelBinder extends return rocketInboundChannelAdapter; } + @Override + protected PolledConsumerResources createPolledConsumerResources(String name, + String group, ConsumerDestination destination, + ExtendedConsumerProperties consumerProperties) { + RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource( + rocketBinderConfigurationProperties, consumerProperties, name, group); + return new PolledConsumerResources(rocketMQMessageSource, + registerErrorInfrastructure(destination, group, consumerProperties, + true)); + } + @Override public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName); From 5671851d08e1df4216a1f4b2b379ee75df919218 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Fri, 12 Apr 2019 10:54:05 +0800 Subject: [PATCH 3/5] Polish #541, update rocketmq-examples with poll consumer --- .../examples/RocketMQConsumerApplication.java | 31 +++++++++++ .../src/main/resources/application.properties | 4 ++ .../examples/RocketMQProduceApplication.java | 54 ++++++++++++++----- .../src/main/resources/application.properties | 4 ++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java index 25204919..e7a2385f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -1,10 +1,15 @@ package org.springframework.cloud.alibaba.cloud.examples; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.binder.PollableMessageSource; +import org.springframework.context.annotation.Bean; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.SubscribableChannel; /** @@ -27,10 +32,36 @@ public class RocketMQConsumerApplication { @Input("input4") SubscribableChannel input4(); + + @Input("input5") + PollableMessageSource input5(); } public static void main(String[] args) { SpringApplication.run(RocketMQConsumerApplication.class, args); } + @Bean + public ConsumerCustomRunner customRunner() { + return new ConsumerCustomRunner(); + } + + public static class ConsumerCustomRunner implements CommandLineRunner { + + @Autowired + private MySink mySink; + + @Override + public void run(String... args) throws InterruptedException { + while (true) { + mySink.input5().poll(m -> { + String payload = (String) m.getPayload(); + System.out.println("pull msg: " + payload); + }, new ParameterizedTypeReference() { + }); + Thread.sleep(2_000); + } + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index ec27539c..dd8bb6ef 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -24,6 +24,10 @@ spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group spring.cloud.stream.bindings.input4.consumer.concurrency=5 +spring.cloud.stream.bindings.input5.destination=pull-topic +spring.cloud.stream.bindings.input5.content-type=text/plain +spring.cloud.stream.bindings.input5.group=pull-topic-group + spring.application.name=rocketmq-consume-example server.port=28082 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index bd157be0..9a55a4a4 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -9,6 +9,7 @@ import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; /** * @author Jim @@ -23,6 +24,9 @@ public class RocketMQProduceApplication { @Output("output2") MessageChannel output2(); + + @Output("output3") + MessageChannel output3(); } public static void main(String[] args) { @@ -31,7 +35,12 @@ public class RocketMQProduceApplication { @Bean public CustomRunner customRunner() { - return new CustomRunner(); + return new CustomRunner("output1"); + } + + @Bean + public CustomRunner customRunner2() { + return new CustomRunner("output3"); } @Bean @@ -40,24 +49,45 @@ public class RocketMQProduceApplication { } public static class CustomRunner implements CommandLineRunner { + + private final String bindingName; + + public CustomRunner(String bindingName) { + this.bindingName = bindingName; + } + @Autowired private SenderService senderService; + @Autowired + private MySource mySource; + @Override public void run(String... args) throws Exception { - int count = 5; - for (int index = 1; index <= count; index++) { - String msgContent = "msg-" + index; - if (index % 3 == 0) { - senderService.send(msgContent); - } - else if (index % 3 == 1) { - senderService.sendWithTags(msgContent, "tagStr"); - } - else { - senderService.sendObject(new Foo(index, "foo"), "tagObj"); + if (this.bindingName.equals("output1")) { + int count = 5; + for (int index = 1; index <= count; index++) { + String msgContent = "msg-" + index; + if (index % 3 == 0) { + senderService.send(msgContent); + } + else if (index % 3 == 1) { + senderService.sendWithTags(msgContent, "tagStr"); + } + else { + senderService.sendObject(new Foo(index, "foo"), "tagObj"); + } } } + else if (this.bindingName.equals("output3")) { + int count = 50; + for (int index = 1; index <= count; index++) { + String msgContent = "pullMsg-" + index; + mySource.output3() + .send(MessageBuilder.withPayload(msgContent).build()); + } + } + } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties index beca964a..a77b3084 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -12,6 +12,10 @@ spring.cloud.stream.bindings.output2.content-type=application/json spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup +spring.cloud.stream.bindings.output3.destination=pull-topic +spring.cloud.stream.bindings.output3.content-type=text/plain +spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group + spring.application.name=rocketmq-produce-example server.port=28081 From ad944513b78979104addc7de8f601eaf9e7aa2ff Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Mon, 15 Apr 2019 17:26:20 +0800 Subject: [PATCH 4/5] Polish #541, enhance error handler for Polled Consumer --- .../RocketMQMessageChannelBinder.java | 26 +++++++++++++++++++ .../RocketMQConsumerProperties.java | 9 ++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 088ee58c..30eb3760 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -43,9 +43,13 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProdu import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallback.Status; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -218,6 +222,28 @@ public class RocketMQMessageChannelBinder extends true)); } + @Override + protected MessageHandler getPolledConsumerErrorMessageHandler( + ConsumerDestination destination, String group, + ExtendedConsumerProperties properties) { + return message -> { + if (message.getPayload() instanceof MessagingException) { + AcknowledgmentCallback ack = StaticMessageHeaderAccessor + .getAcknowledgmentCallback( + ((MessagingException) message.getPayload()) + .getFailedMessage()); + if (ack != null) { + if (properties.getExtension().shouldRequeue()) { + ack.acknowledge(Status.REQUEUE); + } + else { + ack.acknowledge(Status.REJECT); + } + } + } + }; + } + @Override public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 00b41a03..343c9056 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -51,7 +52,9 @@ public class RocketMQConsumerProperties { private Boolean orderly = false; /** - * for concurrently listener. message consume retry strategy + * for concurrently listener. message consume retry strategy. see + * {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or + * discard, see {@link this#shouldRequeue}), others means requeue */ private int delayLevelWhenNextConsume = 0; @@ -141,4 +144,8 @@ public class RocketMQConsumerProperties { public void setFromStore(boolean fromStore) { this.fromStore = fromStore; } + + public boolean shouldRequeue() { + return delayLevelWhenNextConsume != -1; + } } From d37f21885bd519a593aa557d218e26c6da53d9df Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 16 Apr 2019 10:35:32 +0800 Subject: [PATCH 5/5] Polish #541, enhance concurrent handle --- .../RocketMQMessageQueueChooser.java | 4 +- .../integration/RocketMQMessageSource.java | 203 +++++++++--------- 2 files changed, 99 insertions(+), 108 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java index 35663890..03eacbe5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.common.message.MessageQueue; */ public class RocketMQMessageQueueChooser { - private int queueIndex = 0; + private volatile int queueIndex = 0; private volatile List messageQueues; @@ -35,7 +35,7 @@ public class RocketMQMessageQueueChooser { return messageQueues.get(queueIndex); } - public synchronized int requeue() { + public int requeue() { if (queueIndex - 1 < 0) { this.queueIndex = messageQueues.size() - 1; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java index 316a40bf..d7f13be6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; import java.util.List; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; @@ -67,7 +65,7 @@ public class RocketMQMessageSource extends AbstractMessageSource private final String group; - private final Lock lock = new ReentrantLock(); + private final Object consumerMonitor = new Object(); private DefaultMQPullConsumer consumer; @@ -97,7 +95,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public void start() { + public synchronized void start() { if (this.isRunning()) { throw new IllegalStateException( "pull consumer already running. " + this.toString()); @@ -151,7 +149,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public void stop() { + public synchronized void stop() { if (this.isRunning()) { this.setRunning(false); consumer.shutdown(); @@ -159,73 +157,64 @@ public class RocketMQMessageSource extends AbstractMessageSource } @Override - public boolean isRunning() { + public synchronized boolean isRunning() { return running; } @Override - protected Object doReceive() { + protected synchronized Object doReceive() { if (messageQueueChooser.getMessageQueues() == null || messageQueueChooser.getMessageQueues().size() == 0) { return null; } - if (lock.tryLock()) { - try { - int count = 0; - while (count < messageQueueChooser.getMessageQueues().size()) { - - MessageQueue messageQueue = messageQueueChooser.choose(); - - long offset = consumer.fetchConsumeOffset(messageQueue, - rocketMQConsumerProperties.getExtension().isFromStore()); - - log.debug( - "topic='{}', group='{}', messageQueue='{}', offset now='{}'", - this.topic, this.group, messageQueue, offset); - - PullResult pullResult; - if (messageSelector != null) { - pullResult = consumer.pull(messageQueue, messageSelector, offset, - 1); - } - else { - pullResult = consumer.pull(messageQueue, (String) null, offset, - 1); - } - - if (pullResult.getPullStatus() == PullStatus.FOUND) { - List messageExtList = pullResult.getMsgFoundList(); - Message message = RocketMQUtil - .convertToSpringMessage(messageExtList.get(0)); - - AcknowledgmentCallback ackCallback = this.ackCallbackFactory - .createCallback( - new RocketMQAckInfo(messageQueue, pullResult, - consumer, messageQueueChooser, offset)); - - Message messageResult = MessageBuilder.fromMessage(message) - .setHeader( - IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, - ackCallback) - .build(); - messageQueueChooser.increment(); - return messageResult; - } - else { - log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", - messageQueueChooser.getMessageQueues(), - pullResult.getPullStatus(), topic); - } + try { + int count = 0; + while (count < messageQueueChooser.getMessageQueues().size()) { + MessageQueue messageQueue; + synchronized (this.consumerMonitor) { + messageQueue = messageQueueChooser.choose(); messageQueueChooser.increment(); - count++; } + + long offset = consumer.fetchConsumeOffset(messageQueue, + rocketMQConsumerProperties.getExtension().isFromStore()); + + log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'", + this.topic, this.group, messageQueue, offset); + + PullResult pullResult; + if (messageSelector != null) { + pullResult = consumer.pull(messageQueue, messageSelector, offset, 1); + } + else { + pullResult = consumer.pull(messageQueue, (String) null, offset, 1); + } + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExtList = pullResult.getMsgFoundList(); + + Message message = RocketMQUtil + .convertToSpringMessage(messageExtList.get(0)); + + AcknowledgmentCallback ackCallback = this.ackCallbackFactory + .createCallback(new RocketMQAckInfo(messageQueue, pullResult, + consumer, offset)); + + Message messageResult = MessageBuilder.fromMessage(message).setHeader( + IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + ackCallback).build(); + return messageResult; + } + else { + log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", + messageQueueChooser.getMessageQueues(), + pullResult.getPullStatus(), topic); + } + count++; } - catch (Exception e) { - log.error("Consumer pull error: " + e.getMessage(), e); - } - finally { - lock.unlock(); - } + } + catch (Exception e) { + log.error("Consumer pull error: " + e.getMessage(), e); } return null; } @@ -235,20 +224,15 @@ public class RocketMQMessageSource extends AbstractMessageSource return "rocketmq:message-source"; } - public void setRunning(boolean running) { + public synchronized void setRunning(boolean running) { this.running = running; } - public void resetMessageQueues(Set queueSet) { - lock.lock(); - try { - log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, - queueSet); + public synchronized void resetMessageQueues(Set queueSet) { + log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet); + synchronized (this.consumerMonitor) { this.messageQueueChooser.reset(queueSet); } - finally { - lock.unlock(); - } } public static class RocketMQCallbackFactory @@ -295,35 +279,42 @@ public class RocketMQMessageSource extends AbstractMessageSource @Override public void acknowledge(Status status) { Assert.notNull(status, "'status' cannot be null"); + if (this.acknowledged) { + throw new IllegalStateException("Already acknowledged"); + } log.debug("acknowledge(" + status.name() + ") for " + this); - try { - switch (status) { - case ACCEPT: - case REJECT: - ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), - ackInfo.getPullResult().getNextBeginOffset()); - log.debug("messageQueue='{}' offset update to `{}`", - ackInfo.getMessageQueue(), - String.valueOf(ackInfo.getPullResult().getNextBeginOffset())); - break; - case REQUEUE: - // decrease index and update offset of messageQueue of ackInfo - int oldIndex = ackInfo.getMessageQueueChooser().requeue(); - ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), - ackInfo.getOldOffset()); - log.debug( - "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", - ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset()); - break; - default: - break; + synchronized (this.ackInfo.getConsumerMonitor()) { + try { + switch (status) { + case ACCEPT: + case REJECT: + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), + ackInfo.getPullResult().getNextBeginOffset()); + log.debug("messageQueue='{}' offset update to `{}`", + ackInfo.getMessageQueue(), String.valueOf( + ackInfo.getPullResult().getNextBeginOffset())); + break; + case REQUEUE: + // decrease index and update offset of messageQueue of ackInfo + int oldIndex = ackInfo.getMessageQueueChooser().requeue(); + ackInfo.getConsumer().updateConsumeOffset( + ackInfo.getMessageQueue(), ackInfo.getOldOffset()); + log.debug( + "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", + ackInfo.getMessageQueue(), oldIndex, + ackInfo.getOldOffset()); + break; + default: + break; + } + } + catch (MQClientException e) { + log.error("acknowledge error: " + e.getErrorMessage(), e); + } + finally { + this.acknowledged = true; } - } - catch (MQClientException e) { - log.error("acknowledge error: " + e.getErrorMessage(), e); - } - finally { - this.acknowledged = true; } } @@ -334,7 +325,7 @@ public class RocketMQMessageSource extends AbstractMessageSource } } - public static class RocketMQAckInfo { + public class RocketMQAckInfo { private final MessageQueue messageQueue; @@ -342,17 +333,13 @@ public class RocketMQMessageSource extends AbstractMessageSource private final DefaultMQPullConsumer consumer; - private final RocketMQMessageQueueChooser messageQueueChooser; - private final long oldOffset; public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, - DefaultMQPullConsumer consumer, - RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) { + DefaultMQPullConsumer consumer, long oldOffset) { this.messageQueue = messageQueue; this.pullResult = pullResult; this.consumer = consumer; - this.messageQueueChooser = messageQueueChooser; this.oldOffset = oldOffset; } @@ -369,18 +356,22 @@ public class RocketMQMessageSource extends AbstractMessageSource } public RocketMQMessageQueueChooser getMessageQueueChooser() { - return messageQueueChooser; + return RocketMQMessageSource.this.messageQueueChooser; } public long getOldOffset() { return oldOffset; } + public Object getConsumerMonitor() { + return RocketMQMessageSource.this.consumerMonitor; + } + @Override public String toString() { return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult=" - + pullResult + ", consumer=" + consumer + ", messageQueueChooser=" - + messageQueueChooser + ", oldOffset=" + oldOffset + '}'; + + pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset + + '}'; } }