mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
fixes #120 and do some refactor
This commit is contained in:
parent
15179aeee2
commit
2ab11490dc
@ -189,14 +189,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
logger.error(
|
logger.error(
|
||||||
"Rocket Message hasn't been processed successfully. Caused by ",
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
||||||
e);
|
e);
|
||||||
instrumentationManager
|
instrumentationManager
|
||||||
.getConsumerInstrumentation(
|
.getConsumerInstrumentation(
|
||||||
RocketMQInboundChannelAdapter.this.destination)
|
RocketMQInboundChannelAdapter.this.destination)
|
||||||
.markConsumedFailure();
|
.markConsumedFailure();
|
||||||
throw new RuntimeException(
|
throw new RuntimeException(
|
||||||
"Rocket Message hasn't been processed successfully. Caused by ",
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry;
|
|||||||
public class ConsumerGroupInstrumentation extends Instrumentation {
|
public class ConsumerGroupInstrumentation extends Instrumentation {
|
||||||
private MetricRegistry metricRegistry;
|
private MetricRegistry metricRegistry;
|
||||||
|
|
||||||
private AtomicBoolean delayedStart = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
|
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
|
||||||
super(name);
|
super(name);
|
||||||
this.metricRegistry = metricRegistry;
|
this.metricRegistry = metricRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markDelayedStart() {
|
|
||||||
delayedStart.set(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isUp() {
|
|
||||||
return started.get() || delayedStart.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isOutOfService() {
|
|
||||||
return !started.get() && startException == null && !delayedStart.get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||||
public class RocketMQBinderConfigurationProperties {
|
public class RocketMQBinderConfigurationProperties {
|
||||||
|
|
||||||
private String namesrvAddr;
|
private String namesrvAddr = "127.0.0.1:9876";
|
||||||
|
|
||||||
private String logLevel = "ERROR";
|
private String logLevel = "ERROR";
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user