diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index cb35b5a6..9c8b5221 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -21,6 +21,8 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/
public interface RocketMQBinderConstants {
+ String ENDPOINT_ID = "rocketmq_binder";
+
/**
* Header key
*/
@@ -33,10 +35,27 @@ public interface RocketMQBinderConstants {
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
- * Instrumentation key
+ * Instrumentation
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
- String ENDPOINT_ID = "rocketmq_binder";
+ interface Metrics {
+ interface Producer {
+ String PREFIX = "scs-rocketmq.producer.";
+ String TOTAL_SENT = "totalSent";
+ String TOTAL_SENT_FAILURES = "totalSentFailures";
+ String SENT_PER_SECOND = "sentPerSecond";
+ String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
+ }
+
+ interface Consumer {
+ String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
+ String PREFIX = "scs-rocketmq.consumer.";
+ String TOTAL_CONSUMED = "totalConsumed";
+ String CONSUMED_PER_SECOND = "consumedPerSecond";
+ String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
+ String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
+ }
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
index e773729f..f624b237 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
@@ -16,38 +16,44 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
+import static com.codahale.metrics.MetricRegistry.name;
+
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import static com.codahale.metrics.MetricRegistry.name;
-
/**
* @author juven.xuxb
* @author Jim
*/
public class ConsumerInstrumentation extends Instrumentation {
- private final Counter totalConsumed;
- private final Counter totalConsumedFailures;
- private final Meter consumedPerSecond;
- private final Meter consumedFailuresPerSecond;
+ private final Counter totalConsumed;
+ private final Counter totalConsumedFailures;
+ private final Meter consumedPerSecond;
+ private final Meter consumedFailuresPerSecond;
- public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
- super(baseMetricName);
- this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
- this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
- this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
- this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
- }
+ public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
+ this.totalConsumed = registry
+ .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
+ this.consumedPerSecond = registry
+ .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
+ this.totalConsumedFailures = registry
+ .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
+ this.consumedFailuresPerSecond = registry
+ .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
+ }
- public void markConsumed() {
- totalConsumed.inc();
- consumedPerSecond.mark();
- }
+ public void markConsumed() {
+ totalConsumed.inc();
+ consumedPerSecond.mark();
+ }
- public void markConsumedFailure() {
- totalConsumedFailures.inc();
- consumedFailuresPerSecond.mark();
- }
+ public void markConsumedFailure() {
+ totalConsumedFailures.inc();
+ consumedFailuresPerSecond.mark();
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
index 8d047e85..a593af91 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.Set;
import com.codahale.metrics.MetricRegistry;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
/**
* @author Timur Valiev
@@ -37,7 +39,7 @@ public class InstrumentationManager {
private final Map healthInstrumentations = new HashMap<>();
public ProducerInstrumentation getProducerInstrumentation(String destination) {
- String key = "scs-rocketmq.producer." + destination;
+ String key = Producer.PREFIX + destination;
ProducerInstrumentation producerInstrumentation = producerInstrumentations
.get(key);
if (producerInstrumentation == null) {
@@ -48,7 +50,7 @@ public class InstrumentationManager {
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
- String key = "scs-rocketmq.consumer." + destination;
+ String key = Consumer.PREFIX + destination;
ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations
.get(key);
if (consumerInstrumentation == null) {
@@ -59,7 +61,7 @@ public class InstrumentationManager {
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
- String key = "scs-rocketmq.consumerGroup." + group;
+ String key = Consumer.GROUP_PREFIX + group;
ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations
.get(key);
if (consumerGroupInstrumentation == null) {
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
index 6fc6daf9..1ede7802 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
@@ -16,38 +16,44 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
+import static com.codahale.metrics.MetricRegistry.name;
+
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import static com.codahale.metrics.MetricRegistry.name;
-
/**
* @author juven.xuxb
* @author Jim
*/
public class ProducerInstrumentation extends Instrumentation {
- private final Counter totalSent;
- private final Counter totalSentFailures;
- private final Meter sentPerSecond;
- private final Meter sentFailuresPerSecond;
+ private final Counter totalSent;
+ private final Counter totalSentFailures;
+ private final Meter sentPerSecond;
+ private final Meter sentFailuresPerSecond;
- public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
- super(baseMetricName);
- this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
- this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
- this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
- this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
- }
+ public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
- public void markSent() {
- totalSent.inc();
- sentPerSecond.mark();
- }
+ this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
+ this.totalSentFailures = registry
+ .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
+ this.sentPerSecond = registry
+ .meter(name(baseMetricName, Producer.SENT_PER_SECOND));
+ this.sentFailuresPerSecond = registry
+ .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
+ }
- public void markSentFailure() {
- totalSentFailures.inc();
- sentFailuresPerSecond.mark();
- }
+ public void markSent() {
+ totalSent.inc();
+ sentPerSecond.mark();
+ }
+
+ public void markSentFailure() {
+ totalSentFailures.inc();
+ sentFailuresPerSecond.mark();
+ }
}