getRuntime() {
return runtime;
}
- public MetricRegistry getMetricRegistry() {
- return metricRegistry;
- }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
deleted file mode 100644
index 1ede7802..00000000
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.stream.binder.rocketmq.metrics;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-
-/**
- * @author juven.xuxb
- * @author Jim
- */
-public class ProducerInstrumentation extends Instrumentation {
-
- private final Counter totalSent;
- private final Counter totalSentFailures;
- private final Meter sentPerSecond;
- private final Meter sentFailuresPerSecond;
-
- public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
- super(baseMetricName);
-
- this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
- this.totalSentFailures = registry
- .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
- this.sentPerSecond = registry
- .meter(name(baseMetricName, Producer.SENT_PER_SECOND));
- this.sentFailuresPerSecond = registry
- .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
- }
-
- public void markSent() {
- totalSent.inc();
- sentPerSecond.mark();
- }
-
- public void markSentFailure() {
- totalSentFailures.inc();
- sentFailuresPerSecond.mark();
- }
-}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
index f2c028ec..b6d0e9f8 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
/**
* @author Timur Valiev
@@ -25,24 +26,24 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
- private String namesrvAddr = "127.0.0.1:9876";
+ private String namesrvAddr = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
- private String logLevel = "ERROR";
+ private String logLevel = "ERROR";
- public String getNamesrvAddr() {
- return namesrvAddr;
- }
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
- public void setNamesrvAddr(String namesrvAddr) {
- this.namesrvAddr = namesrvAddr;
- }
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
- public String getLogLevel() {
- return logLevel;
- }
+ public String getLogLevel() {
+ return logLevel;
+ }
- public void setLogLevel(String logLevel) {
- this.logLevel = logLevel;
- }
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
index 7f757586..6cfe9b84 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
@@ -28,68 +28,93 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
*/
public class RocketMQConsumerProperties {
- /**
- * using '||' to split tag
- * {@link MQPushConsumer#subscribe(String, String)}
- */
- private String tags;
+ /**
+ * using '||' to split tag {@link MQPushConsumer#subscribe(String, String)}
+ */
+ private String tags;
- /**
- * {@link MQPushConsumer#subscribe(String, MessageSelector)}
- * {@link MessageSelector#bySql(String)}
- */
- private String sql;
+ /**
+ * {@link MQPushConsumer#subscribe(String, MessageSelector)}
+ * {@link MessageSelector#bySql(String)}
+ */
+ private String sql;
- /**
- * {@link MessageModel#BROADCASTING}
- */
- private Boolean broadcasting = false;
+ /**
+ * {@link MessageModel#BROADCASTING}
+ */
+ private Boolean broadcasting = false;
- /**
- * if orderly is true, using {@link MessageListenerOrderly}
- * else if orderly if false, using {@link MessageListenerConcurrently}
- */
- private Boolean orderly = false;
+ /**
+ * if orderly is true, using {@link MessageListenerOrderly} else if orderly if false,
+ * using {@link MessageListenerConcurrently}
+ */
+ private Boolean orderly = false;
- private Boolean enabled = true;
+ /**
+ * for concurrently listener. message consume retry strategy
+ */
+ private int delayLevelWhenNextConsume = 0;
- public String getTags() {
- return tags;
- }
+ /**
+ * for orderly listener. next retry delay time
+ */
+ private long suspendCurrentQueueTimeMillis = 1000;
- public void setTags(String tags) {
- this.tags = tags;
- }
+ private Boolean enabled = true;
- public String getSql() {
- return sql;
- }
+ public String getTags() {
+ return tags;
+ }
- public void setSql(String sql) {
- this.sql = sql;
- }
+ public void setTags(String tags) {
+ this.tags = tags;
+ }
- public Boolean getOrderly() {
- return orderly;
- }
+ public String getSql() {
+ return sql;
+ }
- public void setOrderly(Boolean orderly) {
- this.orderly = orderly;
- }
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
- public Boolean getEnabled() {
- return enabled;
- }
+ public Boolean getOrderly() {
+ return orderly;
+ }
- public void setEnabled(Boolean enabled) {
- this.enabled = enabled;
- }
+ public void setOrderly(Boolean orderly) {
+ this.orderly = orderly;
+ }
- public Boolean getBroadcasting() {
- return broadcasting;
- }
+ public Boolean getEnabled() {
+ return enabled;
+ }
- public void setBroadcasting(Boolean broadcasting) {
- this.broadcasting = broadcasting;
- }
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Boolean getBroadcasting() {
+ return broadcasting;
+ }
+
+ public void setBroadcasting(Boolean broadcasting) {
+ this.broadcasting = broadcasting;
+ }
+
+ public int getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+
+ public long getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
index 1a05ad50..60af4129 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -17,8 +17,6 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
-import org.apache.rocketmq.client.producer.TransactionCheckListener;
/**
* @author Timur Valiev
@@ -31,19 +29,46 @@ public class RocketMQProducerProperties {
/**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}
*/
- private Integer maxMessageSize = 0;
+ private Integer maxMessageSize = 1024 * 1024 * 4;
private Boolean transactional = false;
- /**
- * full class name of {@link LocalTransactionExecuter}
- */
- private String executer;
+ private Boolean sync = false;
+
+ private Boolean vipChannelEnabled = true;
/**
- * full class name of {@link TransactionCheckListener}
+ * Millis of send message timeout.
*/
- private String transactionCheckListener;
+ private int sendMessageTimeout = 3000;
+
+ /**
+ * Compress message body threshold, namely, message body larger than 4k will be
+ * compressed on default.
+ */
+ private int compressMessageBodyThreshold = 1024 * 4;
+
+ /**
+ * Maximum number of retry to perform internally before claiming sending failure in
+ * synchronous mode. This may potentially cause message duplication which is up to
+ * application developers to resolve.
+ */
+ private int retryTimesWhenSendFailed = 2;
+
+ /**
+ *
+ * Maximum number of retry to perform internally before claiming sending failure in
+ * asynchronous mode.
+ *
+ * This may potentially cause message duplication which is up to application
+ * developers to resolve.
+ */
+ private int retryTimesWhenSendAsyncFailed = 2;
+
+ /**
+ * Indicate whether to retry another broker on sending failure internally.
+ */
+ private boolean retryNextServer = false;
public Boolean getEnabled() {
return enabled;
@@ -69,19 +94,60 @@ public class RocketMQProducerProperties {
this.transactional = transactional;
}
- public String getExecuter() {
- return executer;
+ public Boolean getSync() {
+ return sync;
}
- public void setExecuter(String executer) {
- this.executer = executer;
+ public void setSync(Boolean sync) {
+ this.sync = sync;
}
- public String getTransactionCheckListener() {
- return transactionCheckListener;
+ public Boolean getVipChannelEnabled() {
+ return vipChannelEnabled;
}
- public void setTransactionCheckListener(String transactionCheckListener) {
- this.transactionCheckListener = transactionCheckListener;
+ public void setVipChannelEnabled(Boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
}
+
+ public int getSendMessageTimeout() {
+ return sendMessageTimeout;
+ }
+
+ public void setSendMessageTimeout(int sendMessageTimeout) {
+ this.sendMessageTimeout = sendMessageTimeout;
+ }
+
+ public int getCompressMessageBodyThreshold() {
+ return compressMessageBodyThreshold;
+ }
+
+ public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
+ this.compressMessageBodyThreshold = compressMessageBodyThreshold;
+ }
+
+ public int getRetryTimesWhenSendFailed() {
+ return retryTimesWhenSendFailed;
+ }
+
+ public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
+ this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+ }
+
+ public int getRetryTimesWhenSendAsyncFailed() {
+ return retryTimesWhenSendAsyncFailed;
+ }
+
+ public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
+ this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
+ }
+
+ public boolean isRetryNextServer() {
+ return retryNextServer;
+ }
+
+ public void setRetryNextServer(boolean retryNextServer) {
+ this.retryNextServer = retryNextServer;
+ }
+
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories
index 43b85129..89a1a8f5 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories
+++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories
@@ -1,2 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration
+org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
index 3328bb64..155902c0 100644
--- a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
+++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
@@ -22,7 +22,6 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
-import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.context.ConfigurableApplicationContext;
@@ -36,10 +35,9 @@ public class RocketMQAutoConfigurationTests {
@Before
public void setUp() throws Exception {
- this.context = new SpringApplicationBuilder(
- RocketMQBinderEndpointAutoConfiguration.class,
- RocketMQBinderAutoConfiguration.class).web(false).run(
- "--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
+ this.context = new SpringApplicationBuilder(RocketMQBinderAutoConfiguration.class)
+ .web(false)
+ .run("--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
"--spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"--spring.cloud.stream.bindings.output.content-type=application/json",
"--spring.cloud.stream.bindings.input1.destination=TopicOrderTest",