mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
binder remove metrics
This commit is contained in:
parent
53c4a6adf6
commit
03309b8481
@ -56,12 +56,6 @@
|
|||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>io.micrometer</groupId>
|
|
||||||
<artifactId>micrometer-core</artifactId>
|
|
||||||
<optional>true</optional>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-test</artifactId>
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
@ -17,9 +17,7 @@
|
|||||||
package org.springframework.cloud.stream.binder.rocketmq;
|
package org.springframework.cloud.stream.binder.rocketmq;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||||
@ -65,7 +63,6 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
private final RocketMQProperties rocketMQProperties;
|
private final RocketMQProperties rocketMQProperties;
|
||||||
private final InstrumentationManager instrumentationManager;
|
private final InstrumentationManager instrumentationManager;
|
||||||
|
|
||||||
private Set<String> clientConfigId = new HashSet<>();
|
|
||||||
private Map<String, String> topicInUse = new HashMap<>();
|
private Map<String, String> topicInUse = new HashMap<>();
|
||||||
|
|
||||||
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
|
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
|
||||||
@ -103,7 +100,6 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
"there is more than 1 RocketMQTemplates in Spring BeanFactory");
|
"there is more than 1 RocketMQTemplates in Spring BeanFactory");
|
||||||
}
|
}
|
||||||
rocketMQTemplate = rocketMQTemplates.values().iterator().next();
|
rocketMQTemplate = rocketMQTemplates.values().iterator().next();
|
||||||
clientConfigId.add(rocketMQTemplate.getProducer().buildMQClientId());
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
rocketMQTemplate = new RocketMQTemplate();
|
rocketMQTemplate = new RocketMQTemplate();
|
||||||
@ -143,7 +139,6 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
producer.setMaxMessageSize(
|
producer.setMaxMessageSize(
|
||||||
producerProperties.getExtension().getMaxMessageSize());
|
producerProperties.getExtension().getMaxMessageSize());
|
||||||
rocketMQTemplate.setProducer(producer);
|
rocketMQTemplate.setProducer(producer);
|
||||||
clientConfigId.add(producer.buildMQClientId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
|
||||||
@ -218,10 +213,6 @@ public class RocketMQMessageChannelBinder extends
|
|||||||
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> getClientConfigId() {
|
|
||||||
return clientConfigId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, String> getTopicInUse() {
|
public Map<String, String> getTopicInUse() {
|
||||||
return topicInUse;
|
return topicInUse;
|
||||||
}
|
}
|
||||||
|
@ -19,13 +19,9 @@ package org.springframework.cloud.stream.binder.rocketmq.config;
|
|||||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
|
||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.RocketMQBinderMetrics;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||||
@ -33,9 +29,6 @@ import org.springframework.context.annotation.Bean;
|
|||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
|
|
||||||
import io.micrometer.core.instrument.MeterRegistry;
|
|
||||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Timur Valiev
|
* @author Timur Valiev
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||||
@ -84,21 +77,4 @@ public class RocketMQBinderAutoConfiguration {
|
|||||||
return new InstrumentationManager();
|
return new InstrumentationManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnClass(MeterRegistry.class)
|
|
||||||
@ConditionalOnBean(MeterRegistry.class)
|
|
||||||
protected class RocketMQBinderMetricsConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
@ConditionalOnMissingBean(RocketMQBinderMetrics.class)
|
|
||||||
public MeterBinder rocketMqBinderMetrics(
|
|
||||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder,
|
|
||||||
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
|
|
||||||
MeterRegistry meterRegistry) {
|
|
||||||
return new RocketMQBinderMetrics(rocketMQMessageChannelBinder,
|
|
||||||
rocketMQBinderConfigurationProperties, meterRegistry);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,6 @@ public class RocketMQListenerBindingContainer
|
|||||||
.prepareStart(consumer);
|
.prepareStart(consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
rocketMQMessageChannelBinder.getClientConfigId().add(consumer.buildMQClientId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,102 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2018 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.rocketmq.client.ClientConfig;
|
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
||||||
import org.apache.rocketmq.client.impl.MQClientManager;
|
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
||||||
import org.apache.rocketmq.common.message.MessageQueue;
|
|
||||||
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
|
||||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
|
||||||
import org.springframework.context.ApplicationListener;
|
|
||||||
|
|
||||||
import io.micrometer.core.instrument.Gauge;
|
|
||||||
import io.micrometer.core.instrument.MeterRegistry;
|
|
||||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
|
||||||
import io.micrometer.core.lang.NonNull;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
|
||||||
*/
|
|
||||||
public class RocketMQBinderMetrics
|
|
||||||
implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
|
||||||
|
|
||||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
|
||||||
private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
|
|
||||||
private final MeterRegistry meterRegistry;
|
|
||||||
|
|
||||||
static final String METRIC_NAME = "spring.cloud.stream.binder.rocketmq";
|
|
||||||
|
|
||||||
public RocketMQBinderMetrics(
|
|
||||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder,
|
|
||||||
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
|
|
||||||
MeterRegistry meterRegistry) {
|
|
||||||
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
|
|
||||||
this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
|
|
||||||
this.meterRegistry = meterRegistry;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void bindTo(@NonNull MeterRegistry registry) {
|
|
||||||
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
|
|
||||||
pushConsumer
|
|
||||||
.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNameServer());
|
|
||||||
DefaultMQProducer producer = new DefaultMQProducer();
|
|
||||||
producer.setNamesrvAddr(rocketMQBinderConfigurationProperties.getNameServer());
|
|
||||||
|
|
||||||
rocketMQMessageChannelBinder.getTopicInUse().forEach((topic, group) -> {
|
|
||||||
Gauge.builder(METRIC_NAME, this, o -> calculateMsgQueueOffset(topic, group))
|
|
||||||
.tag("group", group).tag("topic", topic)
|
|
||||||
.description("RocketMQ all messageQueue size").register(registry);
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private double calculateMsgQueueOffset(String topic, String group) {
|
|
||||||
for (String clientConfigId : this.rocketMQMessageChannelBinder
|
|
||||||
.getClientConfigId()) {
|
|
||||||
ClientConfig clientConfig = new ClientConfig();
|
|
||||||
String[] clientConfigArr = clientConfigId.split("@", 3);
|
|
||||||
clientConfig.setClientIP(clientConfigArr[0]);
|
|
||||||
clientConfig.setInstanceName(clientConfigArr[1]);
|
|
||||||
if (clientConfigArr.length > 2) {
|
|
||||||
clientConfig.setUnitName(clientConfigArr[2]);
|
|
||||||
}
|
|
||||||
Map<MessageQueue, Long> queueLongMap = MQClientManager.getInstance()
|
|
||||||
.getAndCreateMQClientInstance(clientConfig)
|
|
||||||
.getConsumerStatus(topic, group);
|
|
||||||
if (queueLongMap.size() == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
return queueLongMap.values().stream()
|
|
||||||
.collect(Collectors.summingLong(Long::longValue));
|
|
||||||
}
|
|
||||||
return 0.0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onApplicationEvent(BindingCreatedEvent event) {
|
|
||||||
if (this.meterRegistry != null) {
|
|
||||||
this.bindTo(this.meterRegistry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user