diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 6b22c400..84c2898e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -189,14 +189,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } catch (Exception e) { logger.error( - "Rocket Message hasn't been processed successfully. Caused by ", + "RocketMQ Message hasn't been processed successfully. Caused by ", e); instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumedFailure(); throw new RuntimeException( - "Rocket Message hasn't been processed successfully. Caused by ", + "RocketMQ Message hasn't been processed successfully. Caused by ", e); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java index 83b54c3e..87f0c978 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java @@ -1,7 +1,5 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics; -import java.util.concurrent.atomic.AtomicBoolean; - import com.codahale.metrics.MetricRegistry; /** @@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry; public class ConsumerGroupInstrumentation extends Instrumentation { private MetricRegistry metricRegistry; - private AtomicBoolean delayedStart = new AtomicBoolean(false); - public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { super(name); this.metricRegistry = metricRegistry; } - public void markDelayedStart() { - delayedStart.set(true); - } - - @Override - public boolean isUp() { - return started.get() || delayedStart.get(); - } - - @Override - public boolean isOutOfService() { - return !started.get() && startException == null && !delayedStart.get(); - } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index dd00e7d1..087d95c2 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") public class RocketMQBinderConfigurationProperties { - private String namesrvAddr; + private String namesrvAddr = "127.0.0.1:9876"; private String logLevel = "ERROR";