From 5671851d08e1df4216a1f4b2b379ee75df919218 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Fri, 12 Apr 2019 10:54:05 +0800 Subject: [PATCH] Polish #541, update rocketmq-examples with poll consumer --- .../examples/RocketMQConsumerApplication.java | 31 +++++++++++ .../src/main/resources/application.properties | 4 ++ .../examples/RocketMQProduceApplication.java | 54 ++++++++++++++----- .../src/main/resources/application.properties | 4 ++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java index 25204919..e7a2385f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -1,10 +1,15 @@ package org.springframework.cloud.alibaba.cloud.examples; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.binder.PollableMessageSource; +import org.springframework.context.annotation.Bean; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.SubscribableChannel; /** @@ -27,10 +32,36 @@ public class RocketMQConsumerApplication { @Input("input4") SubscribableChannel input4(); + + @Input("input5") + PollableMessageSource input5(); } public static void main(String[] args) { SpringApplication.run(RocketMQConsumerApplication.class, args); } + @Bean + public ConsumerCustomRunner customRunner() { + return new ConsumerCustomRunner(); + } + + public static class ConsumerCustomRunner implements CommandLineRunner { + + @Autowired + private MySink mySink; + + @Override + public void run(String... args) throws InterruptedException { + while (true) { + mySink.input5().poll(m -> { + String payload = (String) m.getPayload(); + System.out.println("pull msg: " + payload); + }, new ParameterizedTypeReference() { + }); + Thread.sleep(2_000); + } + } + } + } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index ec27539c..dd8bb6ef 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -24,6 +24,10 @@ spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group spring.cloud.stream.bindings.input4.consumer.concurrency=5 +spring.cloud.stream.bindings.input5.destination=pull-topic +spring.cloud.stream.bindings.input5.content-type=text/plain +spring.cloud.stream.bindings.input5.group=pull-topic-group + spring.application.name=rocketmq-consume-example server.port=28082 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index bd157be0..9a55a4a4 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -9,6 +9,7 @@ import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; /** * @author Jim @@ -23,6 +24,9 @@ public class RocketMQProduceApplication { @Output("output2") MessageChannel output2(); + + @Output("output3") + MessageChannel output3(); } public static void main(String[] args) { @@ -31,7 +35,12 @@ public class RocketMQProduceApplication { @Bean public CustomRunner customRunner() { - return new CustomRunner(); + return new CustomRunner("output1"); + } + + @Bean + public CustomRunner customRunner2() { + return new CustomRunner("output3"); } @Bean @@ -40,24 +49,45 @@ public class RocketMQProduceApplication { } public static class CustomRunner implements CommandLineRunner { + + private final String bindingName; + + public CustomRunner(String bindingName) { + this.bindingName = bindingName; + } + @Autowired private SenderService senderService; + @Autowired + private MySource mySource; + @Override public void run(String... args) throws Exception { - int count = 5; - for (int index = 1; index <= count; index++) { - String msgContent = "msg-" + index; - if (index % 3 == 0) { - senderService.send(msgContent); - } - else if (index % 3 == 1) { - senderService.sendWithTags(msgContent, "tagStr"); - } - else { - senderService.sendObject(new Foo(index, "foo"), "tagObj"); + if (this.bindingName.equals("output1")) { + int count = 5; + for (int index = 1; index <= count; index++) { + String msgContent = "msg-" + index; + if (index % 3 == 0) { + senderService.send(msgContent); + } + else if (index % 3 == 1) { + senderService.sendWithTags(msgContent, "tagStr"); + } + else { + senderService.sendObject(new Foo(index, "foo"), "tagObj"); + } } } + else if (this.bindingName.equals("output3")) { + int count = 50; + for (int index = 1; index <= count; index++) { + String msgContent = "pullMsg-" + index; + mySource.output3() + .send(MessageBuilder.withPayload(msgContent).build()); + } + } + } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties index beca964a..a77b3084 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -12,6 +12,10 @@ spring.cloud.stream.bindings.output2.content-type=application/json spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup +spring.cloud.stream.bindings.output3.destination=pull-topic +spring.cloud.stream.bindings.output3.content-type=text/plain +spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group + spring.application.name=rocketmq-produce-example server.port=28081