mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
update examples
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.alibaba.cloud.examples;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -11,8 +12,8 @@ import org.springframework.stereotype.Service;
|
||||
public class ReceiveService {
|
||||
|
||||
@StreamListener("input1")
|
||||
public void receiveInput1(String receiveMsg) {
|
||||
System.out.println("input1 receive: " + receiveMsg);
|
||||
public void receiveInput1(Message message) {
|
||||
System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
|
||||
}
|
||||
|
||||
@StreamListener("input2")
|
||||
|
@@ -4,6 +4,7 @@ spring.cloud.stream.bindings.input1.destination=test-topic
|
||||
spring.cloud.stream.bindings.input1.content-type=text/plain
|
||||
spring.cloud.stream.bindings.input1.group=test-group1
|
||||
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
|
||||
spring.cloud.stream.rocketmq.bindings.input1.consumer.trustedPackages=com.alibaba.cloud
|
||||
|
||||
spring.cloud.stream.bindings.input2.destination=test-topic
|
||||
spring.cloud.stream.bindings.input2.content-type=text/plain
|
||||
|
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.examples;
|
||||
|
||||
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class MyPartitionSelectorStrategy implements PartitionSelectorStrategy {
|
||||
@Override
|
||||
public int selectPartition(Object key, int partitionCount) {
|
||||
System.out.println("partition key: " + key + ", partitionCount: " + partitionCount);
|
||||
return 0;
|
||||
}
|
||||
}
|
@@ -9,6 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
@@ -28,6 +29,9 @@ public class RocketMQProduceApplication {
|
||||
|
||||
@Output("output3")
|
||||
MessageChannel output3();
|
||||
|
||||
@Output("output4")
|
||||
MessageChannel output4();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
@@ -44,6 +48,11 @@ public class RocketMQProduceApplication {
|
||||
return new CustomRunner("output3");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CustomRunner customRunner3() {
|
||||
return new CustomRunner("output4");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CustomRunnerWithTransactional customRunnerWithTransactional() {
|
||||
return new CustomRunnerWithTransactional();
|
||||
@@ -88,6 +97,15 @@ public class RocketMQProduceApplication {
|
||||
.send(MessageBuilder.withPayload(msgContent).build());
|
||||
}
|
||||
}
|
||||
else if (this.bindingName.equals("output4")) {
|
||||
int count = 5;
|
||||
for (int index = 1; index <= count; index++) {
|
||||
String msgContent = "partitionMsg-" + index;
|
||||
Message message = MessageBuilder.withPayload(msgContent)
|
||||
.setHeader("myPartitionKey", "myPartitionKey").build();
|
||||
mySource.output4().send(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import com.alibaba.cloud.examples.RocketMQProduceApplication.MySource;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
||||
|
||||
@@ -24,6 +25,8 @@ public class SenderService {
|
||||
@Autowired
|
||||
private MySource source;
|
||||
|
||||
private ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
public void send(String msg) throws Exception {
|
||||
source.output1().send(MessageBuilder.withPayload(msg).build());
|
||||
}
|
||||
@@ -38,6 +41,7 @@ public class SenderService {
|
||||
public <T> void sendObject(T msg, String tag) throws Exception {
|
||||
Message message = MessageBuilder.withPayload(msg)
|
||||
.setHeader(MessageConst.PROPERTY_TAGS, tag)
|
||||
.setHeader("foo", new Foo(1, "bar"))
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
|
||||
.build();
|
||||
source.output1().send(message);
|
||||
|
@@ -6,6 +6,7 @@ spring.cloud.stream.bindings.output1.destination=test-topic
|
||||
spring.cloud.stream.bindings.output1.content-type=application/json
|
||||
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
|
||||
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
|
||||
spring.cloud.stream.rocketmq.bindings.output1.producer.sendMessageTimeout=30000
|
||||
|
||||
spring.cloud.stream.bindings.output2.destination=TransactionTopic
|
||||
spring.cloud.stream.bindings.output2.content-type=application/json
|
||||
@@ -16,6 +17,12 @@ spring.cloud.stream.bindings.output3.destination=pull-topic
|
||||
spring.cloud.stream.bindings.output3.content-type=text/plain
|
||||
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
|
||||
|
||||
spring.cloud.stream.bindings.output4.destination=partition-topic
|
||||
spring.cloud.stream.bindings.output4.content-type=text/plain
|
||||
spring.cloud.stream.bindings.output4.producer.partition-key-expression=headers['myPartitionKey']
|
||||
spring.cloud.stream.bindings.output4.producer.partition-selector-class=com.alibaba.cloud.examples.MyPartitionSelectorStrategy
|
||||
spring.cloud.stream.rocketmq.bindings.output4.producer.group=partition-binder-group
|
||||
|
||||
spring.application.name=rocketmq-produce-example
|
||||
|
||||
server.port=28081
|
||||
|
Reference in New Issue
Block a user