1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

rocketmq binder update example for 1.x branch

This commit is contained in:
fangjian0423 2018-12-12 16:10:19 +08:00
parent 8f24bd979b
commit b0238ac88d
6 changed files with 111 additions and 10 deletions

View File

@ -0,0 +1,18 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionCheckListener implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("TransactionCheckListener: " + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@ -0,0 +1,20 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class MyTransactionExecuter implements LocalTransactionExecuter {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
if ("1".equals(msg.getUserProperty("test"))) {
System.out.println(new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@ -30,4 +30,9 @@ public class ReceiveService {
System.out.println("input1 receive again: " + receiveMsg); System.out.println("input1 receive again: " + receiveMsg);
} }
@StreamListener("input4")
public void receiveTransactionalMsg(String transactionMsg) {
System.out.println("input4 receive transaction msg: " + transactionMsg);
}
} }

View File

@ -5,17 +5,19 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.messaging.Source; import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@SpringBootApplication @SpringBootApplication
@EnableBinding({ Source.class, MySink.class }) @EnableBinding({ MySource.class, MySink.class })
public class RocketMQApplication { public class RocketMQApplication {
public interface MySink { public interface MySink {
@ -29,17 +31,33 @@ public class RocketMQApplication {
@Input("input3") @Input("input3")
SubscribableChannel input3(); SubscribableChannel input3();
@Input("input4")
SubscribableChannel input4();
}
public interface MySource {
@Output("output1")
MessageChannel output1();
@Output("output2")
MessageChannel output2();
} }
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args); SpringApplication.run(RocketMQApplication.class, args);
} }
@Bean @Bean
public CustomRunner customRunner() { public CustomRunner customRunner() {
return new CustomRunner(); return new CustomRunner();
} }
@Bean
public CustomRunnerWithTransactional customRunnerWithTransactional() {
return new CustomRunnerWithTransactional();
}
public static class CustomRunner implements CommandLineRunner { public static class CustomRunner implements CommandLineRunner {
@Autowired @Autowired
private SenderService senderService; private SenderService senderService;
@ -62,4 +80,21 @@ public class RocketMQApplication {
} }
} }
public static class CustomRunnerWithTransactional implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Override
public void run(String... args) throws Exception {
// COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg1", false);
// ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg2", true);
// ROLLBACK_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg3", true);
// COMMIT_MESSAGE message
senderService.sendTransactionalMsg("transactional-msg4", false);
}
}
} }

View File

@ -5,7 +5,7 @@ import java.util.Map;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source; import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@ -19,17 +19,17 @@ import org.springframework.util.MimeTypeUtils;
public class SenderService { public class SenderService {
@Autowired @Autowired
private Source source; private MySource source;
public void send(String msg) throws Exception { public void send(String msg) throws Exception {
source.output().send(MessageBuilder.withPayload(msg).build()); source.output1().send(MessageBuilder.withPayload(msg).build());
} }
public <T> void sendWithTags(T msg, String tag) throws Exception { public <T> void sendWithTags(T msg, String tag) throws Exception {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TAGS, tag); map.put(MessageConst.PROPERTY_TAGS, tag);
Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map)); Message message = MessageBuilder.createMessage(msg, new MessageHeaders(map));
source.output().send(message); source.output1().send(message);
} }
public <T> void sendObject(T msg, String tag) throws Exception { public <T> void sendObject(T msg, String tag) throws Exception {
@ -37,7 +37,17 @@ public class SenderService {
.setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build(); .build();
source.output().send(message); source.output1().send(message);
}
public <T> void sendTransactionalMsg(T msg, boolean error) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
if (error) {
builder.setHeader("test", "1");
}
Message message = builder.build();
source.output2().send(message);
} }
} }

View File

@ -2,8 +2,14 @@ spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output1.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json spring.cloud.stream.bindings.output1.content-type=application/json
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter
spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener
spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.content-type=text/plain
@ -26,5 +32,12 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20 spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=210
spring.application.name=rocketmq-example
server.port=28081 server.port=28081
management.security.enabled=false management.security.enabled=false