diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java new file mode 100644 index 00000000..eb6e6ce5 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.context.properties.source.ConfigurationPropertyName; +import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ExtendedBindingHandlerMappingsProviderConfiguration { + + @Bean + public MappingsProvider rocketExtendedPropertiesDefaultMappingsProvider() { + return () -> { + Map mappings = new HashMap<>(); + mappings.put( + ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.bindings"), + ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.default")); + mappings.put( + ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.streams"), + ConfigurationPropertyName + .of("spring.cloud.stream.rocketmq.streams.default")); + return mappings; + }; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java new file mode 100644 index 00000000..abcb9b96 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java @@ -0,0 +1,81 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate; + +import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; +import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter; +import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQConfigBeanPostProcessor; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.converter.CompositeMessageConverter; + +/** + * issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 + * @author Timur Valiev + * @author Jim + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties({ RocketMQExtendedBindingProperties.class, + RocketMQBinderConfigurationProperties.class }) +public class RocketMQBinderAutoConfiguration { + + @Autowired + private RocketMQExtendedBindingProperties extendedBindingProperties; + @Autowired + private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + @Bean + public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() { + return new RocketMQConfigBeanPostProcessor(); + } + + @Bean(RocketMQMessageConverter.DEFAULT_NAME) + @ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME }) + public CompositeMessageConverter rocketMQMessageConverter() { + return new RocketMQMessageConverter().getMessageConverter(); + } + + @Bean + @ConditionalOnEnabledHealthIndicator("rocketmq") + @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator") + public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() { + return new RocketMQBinderHealthIndicator(); + } + + @Bean + public RocketMQTopicProvisioner rocketMQTopicProvisioner() { + return new RocketMQTopicProvisioner(); + } + + @Bean + public RocketMQMessageChannelBinder rocketMQMessageChannelBinder( + RocketMQTopicProvisioner provisioningProvider) { + return new RocketMQMessageChannelBinder(rocketBinderConfigurationProperties, + extendedBindingProperties, provisioningProvider); + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java new file mode 100644 index 00000000..e83a6a17 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.contants; + +import org.apache.rocketmq.common.message.MessageConst; + +import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX; + +/** + * @author zkzlx + */ +public class RocketMQConst extends MessageConst { + + /** + * Header key for RocketMQ Transactional Args. + */ + public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG"; + + /** + * Default NameServer value. + */ + public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876"; + + /** + * Default group for SCS RocketMQ Binder. + */ + public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name"; + + /** + * RocketMQ re-consume times. + */ + public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES"; + + public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS"; + + /** + * It is mainly provided for conversion between rocketMq-message and Spring-message, + * and parameters are passed through HEADERS. + */ + public static class Headers { + public static final String KEYS = MessageConst.PROPERTY_KEYS; + public static final String TAGS = MessageConst.PROPERTY_TAGS; + public static final String TOPIC = "MQ_TOPIC"; + /** + * The ID of the message. + */ + public static final String MESSAGE_ID = "MQ_MESSAGE_ID"; + /** + * The timestamp that the message producer invokes the message sending API. + */ + public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP"; + /** + * The IP and port number of the message producer + */ + public static final String BORN_HOST = "MQ_BORN_HOST"; + + /** + * Message flag, MQ is not processed and is available for use by applications. + */ + public static final String FLAG = "MQ_FLAG"; + /** + * Message consumption queue ID + */ + public static final String QUEUE_ID = "MQ_QUEUE_ID"; + /** + * Message system Flag, such as whether or not to compress, whether or not to + * transactional messages. + */ + public static final String SYS_FLAG = "MQ_SYS_FLAG"; + /** + * The transaction ID of the transaction message. + */ + public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID"; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java new file mode 100644 index 00000000..58f17c6e --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.convert; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.messaging.converter.ByteArrayMessageConverter; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; +import org.springframework.util.ClassUtils; + +/** + * The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} + * @author zkzlx + */ +public class RocketMQMessageConverter { + + public static final String DEFAULT_NAME = "rocketMQMessageConverter"; + + private static final boolean JACKSON_PRESENT; + private static final boolean FASTJSON_PRESENT; + + static { + ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader(); + JACKSON_PRESENT = ClassUtils + .isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) + && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", + classLoader); + FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) + && ClassUtils.isPresent( + "com.alibaba.fastjson.support.config.FastJsonConfig", + classLoader); + } + + private CompositeMessageConverter messageConverter; + + public RocketMQMessageConverter() { + List messageConverters = new ArrayList<>(); + ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter(); + byteArrayMessageConverter.setContentTypeResolver(null); + messageConverters.add(byteArrayMessageConverter); + messageConverters.add(new StringMessageConverter()); + if (JACKSON_PRESENT) { + messageConverters.add(new MappingJackson2MessageConverter()); + } + if (FASTJSON_PRESENT) { + try { + messageConverters.add((MessageConverter) ClassUtils.forName( + "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter", + ClassUtils.getDefaultClassLoader()).newInstance()); + } + catch (ClassNotFoundException | IllegalAccessException + | InstantiationException ignored) { + // ignore this exception + } + } + messageConverter = new CompositeMessageConverter(messageConverters); + } + + public CompositeMessageConverter getMessageConverter() { + return messageConverter; + } + + public void setMessageConverter(CompositeMessageConverter messageConverter) { + this.messageConverter = messageConverter; + } +} \ No newline at end of file diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java new file mode 100644 index 00000000..9afc9482 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.custom; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.hook.CheckForbiddenHook; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.TransactionListener; + +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.util.StringUtils; + +/** + * Gets the beans configured in the configuration file + * + * @author junboXiang + */ +public final class RocketMQBeanContainerCache { + + private static final Class[] CLASSES = new Class[] { + CompositeMessageConverter.class, AllocateMessageQueueStrategy.class, + MessageQueueSelector.class, MessageListener.class, TransactionListener.class, + SendCallback.class, CheckForbiddenHook.class, SendMessageHook.class, + ErrorAcknowledgeHandler.class }; + + private static final Map BEANS_CACHE = new ConcurrentHashMap<>(); + + static void putBean(String beanName, Object beanObj) { + BEANS_CACHE.put(beanName, beanObj); + } + + static Class[] getClassAry() { + return CLASSES; + } + + public static T getBean(String beanName, Class clazz) { + return getBean(beanName, clazz, null); + } + + public static T getBean(String beanName, Class clazz, T defaultObj) { + if (StringUtils.isEmpty(beanName)) { + return defaultObj; + } + Object obj = BEANS_CACHE.get(beanName); + if (null == obj) { + return defaultObj; + } + if (clazz.isAssignableFrom(obj.getClass())) { + return (T) obj; + } + return defaultObj; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java new file mode 100644 index 00000000..30bd6432 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.custom; + +import java.util.stream.Stream; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; + +/** + * find RocketMQ bean by annotations + * + * @author junboXiang + * + */ +public class RocketMQConfigBeanPostProcessor implements BeanPostProcessor { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) + throws BeansException { + Stream.of(RocketMQBeanContainerCache.getClassAry()).forEach(clazz -> { + if (clazz.isAssignableFrom(bean.getClass())) { + RocketMQBeanContainerCache.putBean(beanName, bean); + } + }); + return bean; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java new file mode 100644 index 00000000..fcf6de80 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.extend; + +import org.springframework.integration.acks.AcknowledgmentCallback.Status; +import org.springframework.messaging.Message; + +/** + * @author zkzlx + */ +public interface ErrorAcknowledgeHandler { + + /** + * Ack state handling, including receive, reject, and retry, when a consumption + * exception occurs. + * @param message + * @return see {@link Status} + */ + Status handler(Message message); + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java new file mode 100644 index 00000000..87624958 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -0,0 +1,156 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound; + +import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Extended function related to producer . eg:initial + * + * @author zkzlx + */ +public final class RocketMQConsumerFactory { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQConsumerFactory.class); + + public static DefaultMQPushConsumer initPushConsumer( + ExtendedConsumerProperties extendedConsumerProperties) { + RocketMQConsumerProperties consumerProperties = extendedConsumerProperties + .getExtension(); + Assert.notNull(consumerProperties.getGroup(), + "Property 'group' is required - consumerGroup"); + Assert.notNull(consumerProperties.getNameServer(), + "Property 'nameServer' is required"); + AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache + .getBean(consumerProperties.getAllocateMessageQueueStrategy(), + AllocateMessageQueueStrategy.class, + new AllocateMessageQueueAveragely()); + RPCHook rpcHook = null; + if (!StringUtils.isEmpty(consumerProperties.getAccessKey()) + && !StringUtils.isEmpty(consumerProperties.getSecretKey())) { + rpcHook = new AclClientRPCHook( + new SessionCredentials(consumerProperties.getAccessKey(), + consumerProperties.getSecretKey())); + } + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( + consumerProperties.getGroup(), rpcHook, allocateMessageQueueStrategy, + consumerProperties.getEnableMsgTrace(), + consumerProperties.getCustomizedTraceTopic()); + consumer.setVipChannelEnabled( + null == rpcHook && consumerProperties.getVipChannelEnabled()); + consumer.setInstanceName( + RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup())); + consumer.setNamespace(consumerProperties.getNamespace()); + consumer.setNamesrvAddr(consumerProperties.getNameServer()); + consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel())); + consumer.setUseTLS(consumerProperties.getUseTLS()); + consumer.setPullTimeDelayMillsWhenException( + consumerProperties.getPullTimeDelayMillsWhenException()); + consumer.setPullBatchSize(consumerProperties.getPullBatchSize()); + consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere()); + consumer.setHeartbeatBrokerInterval( + consumerProperties.getHeartbeatBrokerInterval()); + consumer.setPersistConsumerOffsetInterval( + consumerProperties.getPersistConsumerOffsetInterval()); + consumer.setPullInterval(consumerProperties.getPush().getPullInterval()); + consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); + consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); + return consumer; + } + + /** + * todo Compatible with versions less than 4.6 ? + * @return + */ + public static DefaultLitePullConsumer initPullConsumer( + ExtendedConsumerProperties extendedConsumerProperties) { + RocketMQConsumerProperties consumerProperties = extendedConsumerProperties + .getExtension(); + Assert.notNull(consumerProperties.getGroup(), + "Property 'group' is required - consumerGroup"); + Assert.notNull(consumerProperties.getNameServer(), + "Property 'nameServer' is required"); + AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache + .getBean(consumerProperties.getAllocateMessageQueueStrategy(), + AllocateMessageQueueStrategy.class, + new AllocateMessageQueueAveragely()); + + RPCHook rpcHook = null; + if (!StringUtils.isEmpty(consumerProperties.getAccessKey()) + && !StringUtils.isEmpty(consumerProperties.getSecretKey())) { + rpcHook = new AclClientRPCHook( + new SessionCredentials(consumerProperties.getAccessKey(), + consumerProperties.getSecretKey())); + } + + DefaultLitePullConsumer consumer = new DefaultLitePullConsumer( + consumerProperties.getNamespace(), consumerProperties.getGroup(), + rpcHook); + consumer.setVipChannelEnabled( + null == rpcHook && consumerProperties.getVipChannelEnabled()); + consumer.setInstanceName( + RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup())); + consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy); + consumer.setNamesrvAddr(consumerProperties.getNameServer()); + consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel())); + consumer.setUseTLS(consumerProperties.getUseTLS()); + consumer.setPullTimeDelayMillsWhenException( + consumerProperties.getPullTimeDelayMillsWhenException()); + consumer.setConsumerTimeoutMillisWhenSuspend( + consumerProperties.getPull().getConsumerTimeoutMillisWhenSuspend()); + consumer.setPullBatchSize(consumerProperties.getPullBatchSize()); + consumer.setConsumeFromWhere(consumerProperties.getConsumeFromWhere()); + consumer.setHeartbeatBrokerInterval( + consumerProperties.getHeartbeatBrokerInterval()); + consumer.setPersistConsumerOffsetInterval( + consumerProperties.getPersistConsumerOffsetInterval()); + consumer.setPollTimeoutMillis( + consumerProperties.getPull().getPollTimeoutMillis()); + consumer.setPullThreadNums(extendedConsumerProperties.getConcurrency()); + // The internal queues are cached by a maximum of 1000 + consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension() + .getPull().getPullThresholdForAll()); + return consumer; + } + + private static MessageModel getMessageModel(String messageModel) { + for (MessageModel model : MessageModel.values()) { + if (model.getModeCN().equalsIgnoreCase(messageModel)) { + return model; + } + } + return MessageModel.CLUSTERING; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java new file mode 100644 index 00000000..d0aec552 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java @@ -0,0 +1,228 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound; + +import java.util.List; +import java.util.function.Supplier; + +import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; +import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.integration.context.OrderlyShutdownCapable; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +/** + * TODO Describe what it does + * @author Jim + */ +public class RocketMQInboundChannelAdapter extends MessageProducerSupport + implements OrderlyShutdownCapable { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQInboundChannelAdapter.class); + + private RetryTemplate retryTemplate; + private RecoveryCallback recoveryCallback; + private DefaultMQPushConsumer pushConsumer; + + private final String topic; + private final ExtendedConsumerProperties extendedConsumerProperties; + + public RocketMQInboundChannelAdapter(String topic, + ExtendedConsumerProperties extendedConsumerProperties) { + this.topic = topic; + this.extendedConsumerProperties = extendedConsumerProperties; + } + + @Override + protected void onInit() { + if (extendedConsumerProperties.getExtension() == null + || !extendedConsumerProperties.getExtension().getEnabled()) { + return; + } + Instrumentation instrumentation = new Instrumentation(topic, this); + try { + super.onInit(); + if (this.retryTemplate != null) { + Assert.state(getErrorChannel() == null, + "Cannot have an 'errorChannel' property when a 'RetryTemplate' is " + + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + + "send an error message when retries are exhausted"); + this.retryTemplate.registerListener(new RetryListener() { + @Override + public boolean open(RetryContext context, + RetryCallback callback) { + return true; + } + + @Override + public void close(RetryContext context, + RetryCallback callback, Throwable throwable) { + } + + @Override + public void onError(RetryContext context, + RetryCallback callback, Throwable throwable) { + } + }); + } + pushConsumer = RocketMQConsumerFactory + .initPushConsumer(extendedConsumerProperties); + // prepare register consumer message listener,the next step is to be + // compatible with a custom MessageListener. + if (extendedConsumerProperties.getExtension().getPush().getOrderly()) { + pushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, + context) -> RocketMQInboundChannelAdapter.this + .consumeMessage(msgs, () -> { + context.setSuspendCurrentQueueTimeMillis( + extendedConsumerProperties.getExtension() + .getPush() + .getSuspendCurrentQueueTimeMillis()); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + }, () -> ConsumeOrderlyStatus.SUCCESS)); + } + else { + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, + context) -> RocketMQInboundChannelAdapter.this + .consumeMessage(msgs, () -> { + context.setDelayLevelWhenNextConsume( + extendedConsumerProperties.getExtension() + .getPush() + .getDelayLevelWhenNextConsume()); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS)); + } + instrumentation.markStartedSuccessfully(); + } + catch (Exception e) { + instrumentation.markStartFailed(e); + log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "DefaultMQPushConsumer init failed, Caused by " + e.getMessage()) + .build(), e); + } + finally { + InstrumentationManager.addHealthInstrumentation(instrumentation); + } + } + + /** + * The actual execution of a user-defined input consumption service method. + * @param messageExtList rocket mq message list + * @param failSupplier {@link ConsumeConcurrentlyStatus} or + * {@link ConsumeOrderlyStatus} + * @param sucSupplier {@link ConsumeConcurrentlyStatus} or + * {@link ConsumeOrderlyStatus} + * @param + * @return + */ + private R consumeMessage(List messageExtList, + Supplier failSupplier, Supplier sucSupplier) { + if (CollectionUtils.isEmpty(messageExtList)) { + throw new MessagingException( + "DefaultMQPushConsumer consuming failed, Caused by messageExtList is empty"); + } + for (MessageExt messageExt : messageExtList) { + try { + Message message = RocketMQMessageConverterSupport + .convertMessage2Spring(messageExt); + if (this.retryTemplate != null) { + this.retryTemplate.execute(context -> { + this.sendMessage(message); + return message; + }, this.recoveryCallback); + } + else { + this.sendMessage(message); + } + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + return failSupplier.get(); + } + } + return sucSupplier.get(); + } + + @Override + protected void doStart() { + if (extendedConsumerProperties.getExtension() == null + || !extendedConsumerProperties.getExtension().getEnabled()) { + return; + } + try { + pushConsumer.subscribe(topic, RocketMQUtils.getMessageSelector( + extendedConsumerProperties.getExtension().getSubscription())); + pushConsumer.start(); + } + catch (Exception e) { + log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "DefaultMQPushConsumer init failed, Caused by " + e.getMessage()) + .build(), e); + } + } + + @Override + protected void doStop() { + if (pushConsumer != null) { + pushConsumer.shutdown(); + } + } + + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + public void setRecoveryCallback(RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } + + @Override + public int beforeShutdown() { + this.stop(); + return 0; + } + + @Override + public int afterShutdown() { + return 0; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java new file mode 100644 index 00000000..3296a128 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull; + +import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; + +import org.springframework.integration.acks.AcknowledgmentCallback.Status; +import org.springframework.messaging.Message; + +/** + * By default, if consumption fails, the corresponding MessageQueue will always be + * retried, that is, the consumption of other messages in the MessageQueue will be + * blocked. + * + * @author zkzlx + */ +public class DefaultErrorAcknowledgeHandler implements ErrorAcknowledgeHandler { + /** + * Ack state handling, including receive, reject, and retry, when a consumption + * exception occurs. + * + * @param message + * @return see {@link Status} + */ + @Override + public Status handler(Message message) { + return Status.REQUEUE; + } +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java new file mode 100644 index 00000000..f617dd09 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.util.Assert; + +/** + * A pollable {@link org.springframework.integration.core.MessageSource} for RocketMQ. + * @author zkzlx + */ +public class RocketMQAckCallback implements AcknowledgmentCallback { + private final static Logger log = LoggerFactory.getLogger(RocketMQAckCallback.class); + + private boolean acknowledged; + private boolean autoAckEnabled = true; + private MessageExt messageExt; + private AssignedMessageQueue assignedMessageQueue; + private DefaultLitePullConsumer consumer; + private final MessageQueue messageQueue; + + public RocketMQAckCallback(DefaultLitePullConsumer consumer, + AssignedMessageQueue assignedMessageQueue, MessageQueue messageQueue, + MessageExt messageExt) { + this.messageExt = messageExt; + this.consumer = consumer; + this.assignedMessageQueue = assignedMessageQueue; + this.messageQueue = messageQueue; + } + + @Override + public boolean isAcknowledged() { + return this.acknowledged; + } + + @Override + public void noAutoAck() { + this.autoAckEnabled = false; + } + + @Override + public boolean isAutoAck() { + return this.autoAckEnabled; + } + + @Override + public void acknowledge(Status status) { + Assert.notNull(status, "'status' cannot be null"); + if (this.acknowledged) { + throw new IllegalStateException("Already acknowledged"); + } + synchronized (messageQueue) { + try { + long offset = messageExt.getQueueOffset(); + switch (status) { + case REJECT: + case ACCEPT: + long consumerOffset = assignedMessageQueue + .getConsumerOffset(messageQueue); + if (consumerOffset != -1) { + ProcessQueue processQueue = assignedMessageQueue + .getProcessQueue(messageQueue); + if (processQueue != null && !processQueue.isDropped()) { + consumer.getOffsetStore().updateOffset(messageQueue, + consumerOffset, false); + } + } + if (consumer.getMessageModel() == MessageModel.BROADCASTING) { + consumer.getOffsetStore().persist(messageQueue); + } + break; + case REQUEUE: + consumer.seek(messageQueue, offset); + break; + default: + break; + } + } + catch (MQClientException e) { + throw new IllegalStateException(e); + } + finally { + this.acknowledged = true; + } + } + } + +} \ No newline at end of file diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java new file mode 100644 index 00000000..2ca91384 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -0,0 +1,161 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull; + +import java.lang.reflect.Field; +import java.util.List; + +import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; +import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.endpoint.AbstractMessageSource; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ReflectionUtils; + +/** + * @author Jim + */ +public class RocketMQMessageSource extends AbstractMessageSource + implements DisposableBean, Lifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageSource.class); + + private DefaultLitePullConsumer consumer; + private AssignedMessageQueue assignedMessageQueue; + private volatile boolean running; + + private final String topic; + private final MessageSelector messageSelector; + private final ExtendedConsumerProperties extendedConsumerProperties; + + public RocketMQMessageSource(String name, + ExtendedConsumerProperties extendedConsumerProperties) { + this.topic = name; + this.messageSelector = RocketMQUtils.getMessageSelector( + extendedConsumerProperties.getExtension().getSubscription()); + this.extendedConsumerProperties = extendedConsumerProperties; + + } + + @Override + public synchronized void start() { + Instrumentation instrumentation = new Instrumentation(topic, this); + try { + if (this.isRunning()) { + throw new IllegalStateException( + "pull consumer already running. " + this.toString()); + } + this.consumer = RocketMQConsumerFactory + .initPullConsumer(extendedConsumerProperties); + // This parameter must be 1, otherwise doReceive cannot be handled singly. + this.consumer.setPullBatchSize(1); + this.consumer.subscribe(topic, messageSelector); + this.consumer.setAutoCommit(false); + this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer); + this.consumer.start(); + instrumentation.markStartedSuccessfully(); + } + catch (MQClientException e) { + instrumentation.markStartFailed(e); + log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e); + } + finally { + InstrumentationManager.addHealthInstrumentation(instrumentation); + } + this.running = true; + } + + private AssignedMessageQueue acquireAssignedMessageQueue( + DefaultLitePullConsumer consumer) { + Field field = ReflectionUtils.findField(DefaultLitePullConsumer.class, + "defaultLitePullConsumerImpl"); + assert field != null; + field.setAccessible(true); + DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) ReflectionUtils + .getField(field, consumer); + + field = ReflectionUtils.findField(DefaultLitePullConsumerImpl.class, + "assignedMessageQueue"); + assert field != null; + field.setAccessible(true); + return (AssignedMessageQueue) ReflectionUtils.getField(field, + defaultLitePullConsumerImpl); + } + + @Override + public synchronized void stop() { + if (this.isRunning() && null != consumer) { + consumer.unsubscribe(topic); + consumer.shutdown(); + this.running = false; + } + } + + @Override + public synchronized boolean isRunning() { + return running; + } + + @Override + protected synchronized Object doReceive() { + List messageExtList = consumer.poll(); + if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) { + return null; + } + MessageExt messageExt = messageExtList.get(0); + MessageQueue messageQueue = null; + for (MessageQueue queue : assignedMessageQueue.getAssignedMessageQueues()) { + if (queue.getQueueId() == messageExt.getQueueId()) { + messageQueue = queue; + break; + } + } + Message message = RocketMQMessageConverterSupport + .convertMessage2Spring(messageExtList.get(0)); + return MessageBuilder.fromMessage(message) + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + new RocketMQAckCallback(this.consumer, assignedMessageQueue, + messageQueue, messageExt)) + .build(); + } + + @Override + public String getComponentType() { + return "rocketmq:message-source"; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java new file mode 100644 index 00000000..7017ba46 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -0,0 +1,131 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound; + +import java.lang.reflect.Field; + +import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.hook.CheckForbiddenHook; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Extended function related to producer . eg:initial + * + * @author zkzlx + */ +public final class RocketMQProduceFactory { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQProduceFactory.class); + + /** + * init for the producer,including convert producer params. + * @return + */ + public static DefaultMQProducer initRocketMQProducer(String topic, + RocketMQProducerProperties producerProperties) { + Assert.notNull(producerProperties.getGroup(), + "Property 'group' is required - producerGroup"); + Assert.notNull(producerProperties.getNameServer(), + "Property 'nameServer' is required"); + + RPCHook rpcHook = null; + if (!StringUtils.isEmpty(producerProperties.getAccessKey()) + && !StringUtils.isEmpty(producerProperties.getSecretKey())) { + rpcHook = new AclClientRPCHook( + new SessionCredentials(producerProperties.getAccessKey(), + producerProperties.getSecretKey())); + } + DefaultMQProducer producer; + if (RocketMQProducerProperties.ProducerType.Trans + .equalsName(producerProperties.getProducerType())) { + producer = new TransactionMQProducer(producerProperties.getNamespace(), + producerProperties.getGroup(), rpcHook); + if (producerProperties.getEnableMsgTrace()) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher( + producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, + producerProperties.getCustomizedTraceTopic(), rpcHook); + dispatcher.setHostProducer(producer.getDefaultMQProducerImpl()); + Field field = DefaultMQProducer.class + .getDeclaredField("traceDispatcher"); + field.setAccessible(true); + field.set(producer, dispatcher); + producer.getDefaultMQProducerImpl().registerSendMessageHook( + new SendMessageTraceHookImpl(dispatcher)); + } + catch (Throwable e) { + log.error( + "system mq-trace hook init failed ,maybe can't send msg trace data"); + } + } + } + else { + producer = new DefaultMQProducer(producerProperties.getNamespace(), + producerProperties.getGroup(), rpcHook, + producerProperties.getEnableMsgTrace(), + producerProperties.getCustomizedTraceTopic()); + } + + producer.setVipChannelEnabled( + null == rpcHook && producerProperties.getVipChannelEnabled()); + producer.setInstanceName( + RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid())); + producer.setNamesrvAddr(producerProperties.getNameServer()); + producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout()); + producer.setRetryTimesWhenSendFailed( + producerProperties.getRetryTimesWhenSendFailed()); + producer.setRetryTimesWhenSendAsyncFailed( + producerProperties.getRetryTimesWhenSendAsyncFailed()); + producer.setCompressMsgBodyOverHowmuch( + producerProperties.getCompressMsgBodyThreshold()); + producer.setRetryAnotherBrokerWhenNotStoreOK( + producerProperties.getRetryAnotherBroker()); + producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + producer.setUseTLS(producerProperties.getUseTLS()); + CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean( + producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class); + if (null != checkForbiddenHook) { + producer.getDefaultMQProducerImpl() + .registerCheckForbiddenHook(checkForbiddenHook); + } + SendMessageHook sendMessageHook = RocketMQBeanContainerCache + .getBean(producerProperties.getSendMessageHook(), SendMessageHook.class); + if (null != sendMessageHook) { + producer.getDefaultMQProducerImpl().registerSendMessageHook(sendMessageHook); + } + + return producer; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java new file mode 100644 index 00000000..66b58f79 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java @@ -0,0 +1,286 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound; + +import java.util.List; + +import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst; +import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer; +import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.context.Lifecycle; +import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessagingException; + +/** + * @author Jim + */ +public class RocketMQProducerMessageHandler extends AbstractMessageHandler + implements Lifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQProducerMessageHandler.class); + + private volatile boolean running = false; + private volatile boolean isTrans = false; + + private ErrorMessageStrategy errorMessageStrategy; + private MessageChannel sendFailureChannel; + private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor; + private DefaultMQProducer defaultMQProducer; + private MessageQueueSelector messageQueueSelector; + + private final ProducerDestination destination; + private final ExtendedProducerProperties extendedProducerProperties; + private final RocketMQProducerProperties mqProducerProperties; + + public RocketMQProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties extendedProducerProperties, + RocketMQProducerProperties mqProducerProperties) { + this.destination = destination; + this.extendedProducerProperties = extendedProducerProperties; + this.mqProducerProperties = mqProducerProperties; + } + + @Override + protected void onInit() { + if (null == mqProducerProperties || !mqProducerProperties.getEnabled()) { + return; + } + super.onInit(); + this.defaultMQProducer = RocketMQProduceFactory + .initRocketMQProducer(destination.getName(), mqProducerProperties); + this.isTrans = defaultMQProducer instanceof TransactionMQProducer; + // Use the default if the partition is on and no customization is available. + this.messageQueueSelector = RocketMQBeanContainerCache.getBean( + mqProducerProperties.getMessageQueueSelector(), + MessageQueueSelector.class, + extendedProducerProperties.isPartitioned() + ? new PartitionMessageQueueSelector() + : null); + } + + @Override + public void start() { + Instrumentation instrumentation = new Instrumentation(destination.getName(), + this); + try { + defaultMQProducer.start(); + // TransactionMQProducer does not currently support custom + // MessageQueueSelector. + if (!isTrans && extendedProducerProperties.isPartitioned()) { + List messageQueues = defaultMQProducer + .fetchPublishMessageQueues(destination.getName()); + if (extendedProducerProperties.getPartitionCount() != messageQueues + .size()) { + logger.info(String.format( + "The partition count of topic '%s' will change from '%s' to '%s'", + destination.getName(), + extendedProducerProperties.getPartitionCount(), + messageQueues.size())); + extendedProducerProperties.setPartitionCount(messageQueues.size()); + // may be npe! + partitioningInterceptor.setPartitionCount( + extendedProducerProperties.getPartitionCount()); + } + } + running = true; + instrumentation.markStartedSuccessfully(); + } + catch (MQClientException | NullPointerException e) { + instrumentation.markStartFailed(e); + log.error("The defaultMQProducer startup failure !!!", e); + } + finally { + InstrumentationManager.addHealthInstrumentation(instrumentation); + } + } + + @Override + public void stop() { + if (running && null != defaultMQProducer) { + defaultMQProducer.shutdown(); + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + protected void handleMessageInternal(Message message) { + try { + org.apache.rocketmq.common.message.Message mqMessage = RocketMQMessageConverterSupport + .convertMessage2MQ(destination.getName(), message); + SendResult sendResult; + if (defaultMQProducer instanceof TransactionMQProducer) { + TransactionListener transactionListener = RocketMQBeanContainerCache + .getBean(mqProducerProperties.getTransactionListener(), + TransactionListener.class); + if (transactionListener == null) { + throw new MessagingException( + "TransactionMQProducer must have a TransactionMQProducer !!! "); + } + ((TransactionMQProducer) defaultMQProducer) + .setTransactionListener(transactionListener); + log.info("send transaction message :" + mqMessage); + sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage, + message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS)); + } + else { + log.info("send message :" + mqMessage); + sendResult = this.send(mqMessage, this.messageQueueSelector, + message.getHeaders(), message); + } + if (sendResult == null + || !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.error("message send fail.SendStatus is not OK "); + this.doFail(message, new MessagingException( + "message send fail.SendStatus is not OK.")); + } + } + catch (Exception e) { + log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(), + e); + this.doFail(message, e); + } + } + + private SendResult send(org.apache.rocketmq.common.message.Message mqMessage, + MessageQueueSelector selector, Object args, Message message) + throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.SEND_OK); + if (RocketMQProducerProperties.SendType.OneWay + .equalsName(mqProducerProperties.getSendType())) { + if (null != selector) { + defaultMQProducer.sendOneway(mqMessage, selector, args); + } + else { + defaultMQProducer.sendOneway(mqMessage); + } + return sendResult; + } + if (RocketMQProducerProperties.SendType.Sync + .equalsName(mqProducerProperties.getSendType())) { + if (null != selector) { + return defaultMQProducer.send(mqMessage, selector, args); + } + return defaultMQProducer.send(mqMessage); + } + if (RocketMQProducerProperties.SendType.Async + .equalsName(mqProducerProperties.getSendType())) { + if (null != selector) { + defaultMQProducer.send(mqMessage, selector, args, + this.getSendCallback(message)); + } + else { + defaultMQProducer.send(mqMessage, this.getSendCallback(message)); + } + return sendResult; + } + throw new MessagingException( + "message hasn't been sent,cause by : the SendType must be in this values[OneWay, Async, Sync]"); + } + + /** + * https://github.com/alibaba/spring-cloud-alibaba/issues/1408 + * @param message + * @return + */ + private SendCallback getSendCallback(Message message) { + SendCallback sendCallback = RocketMQBeanContainerCache + .getBean(mqProducerProperties.getSendCallBack(), SendCallback.class); + if (null == sendCallback) { + sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + + @Override + public void onException(Throwable e) { + RocketMQProducerMessageHandler.this.doFail(message, e); + } + }; + } + return sendCallback; + } + + private void doFail(Message message, Throwable e) { + if (getSendFailureChannel() != null) { + getSendFailureChannel().send(getErrorMessageStrategy().buildErrorMessage(e, + ErrorMessageUtils.getAttributeAccessor(message, message))); + } + else { + throw new MessagingException(message, e); + } + } + + public MessageChannel getSendFailureChannel() { + return sendFailureChannel; + } + + public void setSendFailureChannel(MessageChannel sendFailureChannel) { + this.sendFailureChannel = sendFailureChannel; + } + + public ErrorMessageStrategy getErrorMessageStrategy() { + return errorMessageStrategy; + } + + public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) { + this.errorMessageStrategy = errorMessageStrategy; + } + + public PartitioningInterceptor getPartitioningInterceptor() { + return partitioningInterceptor; + } + + public RocketMQProducerMessageHandler setPartitioningInterceptor( + PartitioningInterceptor partitioningInterceptor) { + this.partitioningInterceptor = partitioningInterceptor; + return this; + } +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java new file mode 100644 index 00000000..00e7d30d --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.properties; + +import java.io.Serializable; + +import org.apache.rocketmq.client.AccessChannel; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.remoting.netty.TlsSystemConfig; + +/** + * @author zkzlx + */ +public class RocketMQCommonProperties implements Serializable { + private static final long serialVersionUID = -6724870154343284715L; + + private boolean enabled = true; + + private String nameServer; + + /** + * The property of "access-key". + */ + private String accessKey; + + /** + * The property of "secret-key". + */ + private String secretKey; + /** + * Consumers of the same role is required to have exactly same subscriptions and + * consumerGroup to correctly achieve load balance. It's required and needs to be + * globally unique. + *

+ * Producer group conceptually aggregates all producer instances of exactly same role, + * which is particularly important when transactional messages are involved. + *

+ *

+ * For non-transactional messages, it does not matter as long as it's unique per + * process. + *

+ *

+ * See here for further + * discussion. + */ + private String group; + + private String namespace; + private String accessChannel = AccessChannel.LOCAL.name(); + /** + * Pulling topic information interval from the named server. + * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask + * updateTopicRouteInfoFromNameServer. + */ + private int pollNameServerInterval = 1000 * 30; + /** + * Heartbeat interval in microseconds with message broker. + * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask + * sendHeartbeatToAllBroker . + */ + private int heartbeatBrokerInterval = 1000 * 30; + /** + * Offset persistent interval for consumer. + * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask + * sendHeartbeatToAllBroker . + */ + private int persistConsumerOffsetInterval = 1000 * 5; + + private boolean vipChannelEnabled = false; + + private boolean useTLS = TlsSystemConfig.tlsEnable; + + private boolean enableMsgTrace = true; + private String customizedTraceTopic; + + public boolean getEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(String accessChannel) { + this.accessChannel = accessChannel; + } + + public int getPollNameServerInterval() { + return pollNameServerInterval; + } + + public void setPollNameServerInterval(int pollNameServerInterval) { + this.pollNameServerInterval = pollNameServerInterval; + } + + public int getHeartbeatBrokerInterval() { + return heartbeatBrokerInterval; + } + + public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { + this.heartbeatBrokerInterval = heartbeatBrokerInterval; + } + + public int getPersistConsumerOffsetInterval() { + return persistConsumerOffsetInterval; + } + + public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { + this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + } + + public boolean getVipChannelEnabled() { + return vipChannelEnabled; + } + + public void setVipChannelEnabled(boolean vipChannelEnabled) { + this.vipChannelEnabled = vipChannelEnabled; + } + + public boolean getUseTLS() { + return useTLS; + } + + public void setUseTLS(boolean useTLS) { + this.useTLS = useTLS; + } + + public boolean getEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java new file mode 100644 index 00000000..99a1a36f --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.properties; + +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; + +/** + * Container object for RocketMQ specific extended producer and consumer binding + * properties. + * + * @author Jim + */ +public class RocketMQSpecificPropertiesProvider + implements BinderSpecificPropertiesProvider { + + /** + * Consumer specific binding properties. @see {@link RocketMQConsumerProperties}. + */ + private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); + + /** + * Producer specific binding properties. @see {@link RocketMQProducerProperties}. + */ + private RocketMQProducerProperties producer = new RocketMQProducerProperties(); + + /** + * @return {@link RocketMQConsumerProperties} Consumer specific binding + * properties. @see {@link RocketMQConsumerProperties}. + */ + @Override + public RocketMQConsumerProperties getConsumer() { + return this.consumer; + } + + public void setConsumer(RocketMQConsumerProperties consumer) { + this.consumer = consumer; + } + + /** + * @return {@link RocketMQProducerProperties} Producer specific binding + * properties. @see {@link RocketMQProducerProperties}. + */ + @Override + public RocketMQProducerProperties getProducer() { + return this.producer; + } + + public void setProducer(RocketMQProducerProperties producer) { + this.producer = producer; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java new file mode 100644 index 00000000..d4b62e5f --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import java.nio.charset.Charset; +import java.util.Map; +import java.util.Objects; + +import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst; +import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst.Headers; +import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter; +import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.CollectionUtils; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; + +/** + * + * @author zkzlx + */ +public class RocketMQMessageConverterSupport { + + private static final CompositeMessageConverter MESSAGE_CONVERTER = RocketMQBeanContainerCache + .getBean(RocketMQMessageConverter.DEFAULT_NAME, + CompositeMessageConverter.class, + new RocketMQMessageConverter().getMessageConverter()); + + public static Message convertMessage2Spring(MessageExt message) { + MessageBuilder messageBuilder = MessageBuilder.withPayload(message.getBody()) + .setHeader(toRocketHeaderKey(Headers.KEYS), message.getKeys()) + .setHeader(toRocketHeaderKey(Headers.TAGS), message.getTags()) + .setHeader(toRocketHeaderKey(Headers.TOPIC), message.getTopic()) + .setHeader(toRocketHeaderKey(Headers.MESSAGE_ID), message.getMsgId()) + .setHeader(toRocketHeaderKey(Headers.BORN_TIMESTAMP), + message.getBornTimestamp()) + .setHeader(toRocketHeaderKey(Headers.BORN_HOST), + message.getBornHostString()) + .setHeader(toRocketHeaderKey(Headers.FLAG), message.getFlag()) + .setHeader(toRocketHeaderKey(Headers.QUEUE_ID), message.getQueueId()) + .setHeader(toRocketHeaderKey(Headers.SYS_FLAG), message.getSysFlag()) + .setHeader(toRocketHeaderKey(Headers.TRANSACTION_ID), + message.getTransactionId()); + addUserProperties(message.getProperties(), messageBuilder); + return messageBuilder.build(); + } + + public static String toRocketHeaderKey(String rawKey) { + return "ROCKET_" + rawKey; + } + + private static void addUserProperties(Map properties, + MessageBuilder messageBuilder) { + if (!CollectionUtils.isEmpty(properties)) { + properties.forEach((key, val) -> { + if (!MessageConst.STRING_HASH_SET.contains(key) + && !MessageHeaders.ID.equals(key) + && !MessageHeaders.TIMESTAMP.equals(key)) { + messageBuilder.setHeader(key, val); + } + }); + } + } + + public static org.apache.rocketmq.common.message.Message convertMessage2MQ( + String destination, Message source) { + Message message = MESSAGE_CONVERTER.toMessage(source.getPayload(), + source.getHeaders()); + assert message != null; + MessageBuilder builder = MessageBuilder.fromMessage(message); + builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); + message = builder.build(); + return doConvert(destination, message); + } + + private static org.apache.rocketmq.common.message.Message doConvert(String topic, + Message message) { + Charset charset = Charset.defaultCharset(); + Object payloadObj = message.getPayload(); + byte[] payloads; + try { + if (payloadObj instanceof String) { + payloads = ((String) payloadObj).getBytes(charset); + } + else if (payloadObj instanceof byte[]) { + payloads = (byte[]) message.getPayload(); + } + else { + String jsonObj = (String) MESSAGE_CONVERTER.fromMessage(message, + payloadObj.getClass()); + if (null == jsonObj) { + throw new RuntimeException(String.format( + "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]", + MESSAGE_CONVERTER.getClass(), payloadObj.getClass(), + payloadObj)); + } + payloads = jsonObj.getBytes(charset); + } + } + catch (Exception e) { + throw new RuntimeException("convert to RocketMQ message failed.", e); + } + return getAndWrapMessage(topic, message.getHeaders(), payloads); + } + + private static org.apache.rocketmq.common.message.Message getAndWrapMessage( + String topic, MessageHeaders headers, byte[] payloads) { + if (topic == null || topic.length() < 1) { + return null; + } + if (payloads == null || payloads.length < 1) { + return null; + } + org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message( + topic, payloads); + if (Objects.nonNull(headers) && !headers.isEmpty()) { + Object tag = headers.getOrDefault(Headers.TAGS, + headers.get(toRocketHeaderKey(Headers.TAGS))); + if (!StringUtils.isEmpty(tag)) { + rocketMsg.setTags(String.valueOf(tag)); + } + + Object keys = headers.getOrDefault(Headers.KEYS, + headers.get(toRocketHeaderKey(Headers.KEYS))); + if (!StringUtils.isEmpty(keys)) { + rocketMsg.setKeys(keys.toString()); + } + Object flagObj = headers.getOrDefault(Headers.FLAG, + headers.get(toRocketHeaderKey(Headers.FLAG))); + int flag = 0; + int delayLevel = 0; + try { + flagObj = flagObj == null ? 0 : flagObj; + Object delayLevelObj = headers.getOrDefault( + RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, + headers.get(toRocketHeaderKey( + RocketMQConst.PROPERTY_DELAY_TIME_LEVEL))); + delayLevelObj = delayLevelObj == null ? 0 : delayLevelObj; + delayLevel = Integer.parseInt(String.valueOf(delayLevelObj)); + flag = Integer.parseInt(String.valueOf(flagObj)); + } + catch (Exception ignored) { + } + if (delayLevel > 0) { + rocketMsg.setDelayTimeLevel(delayLevel); + } + rocketMsg.setFlag(flag); + Object waitStoreMsgOkObj = headers + .getOrDefault(RocketMQConst.PROPERTY_WAIT_STORE_MSG_OK, "true"); + rocketMsg.setWaitStoreMsgOK( + Boolean.parseBoolean(String.valueOf(waitStoreMsgOkObj))); + headers.entrySet().stream() + .filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG)) + .forEach(entry -> { + if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) { + rocketMsg.putUserProperty(entry.getKey(), + String.valueOf(entry.getValue())); + } + }); + + } + return rocketMsg; + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java new file mode 100644 index 00000000..3f789842 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq.utils; + +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQCommonProperties; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.remoting.RPCHook; + +import org.springframework.util.StringUtils; + +/** + * TODO Describe what it does + * + * @author Jim + */ +public class RocketMQUtils { + + public static T mergeRocketMQProperties( + RocketMQBinderConfigurationProperties binderConfigurationProperties, + T mqProperties) { + if (null == binderConfigurationProperties || mqProperties == null) { + return mqProperties; + } + if (StringUtils.isEmpty(mqProperties.getNameServer())) { + mqProperties.setNameServer(binderConfigurationProperties.getNameServer()); + } + if (StringUtils.isEmpty(mqProperties.getSecretKey())) { + mqProperties.setSecretKey(binderConfigurationProperties.getSecretKey()); + } + if (StringUtils.isEmpty(mqProperties.getAccessKey())) { + mqProperties.setAccessKey(binderConfigurationProperties.getAccessKey()); + } + if (StringUtils.isEmpty(mqProperties.getAccessChannel())) { + mqProperties + .setAccessChannel(binderConfigurationProperties.getAccessChannel()); + } + if (StringUtils.isEmpty(mqProperties.getNamespace())) { + mqProperties.setNamespace(binderConfigurationProperties.getNamespace()); + } + if (StringUtils.isEmpty(mqProperties.getGroup())) { + mqProperties.setGroup(binderConfigurationProperties.getGroup()); + } + if (StringUtils.isEmpty(mqProperties.getCustomizedTraceTopic())) { + mqProperties.setCustomizedTraceTopic( + binderConfigurationProperties.getCustomizedTraceTopic()); + } + mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer())); + return mqProperties; + } + + public static String getInstanceName(RPCHook rpcHook, String identify) { + String separator = "|"; + StringBuilder instanceName = new StringBuilder(); + if (null != rpcHook) { + SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook) + .getSessionCredentials(); + instanceName.append(sessionCredentials.getAccessKey()).append(separator) + .append(sessionCredentials.getSecretKey()).append(separator); + } + instanceName.append(identify).append(separator).append(UtilAll.getPid()); + return instanceName.toString(); + } + + public static String getNameServerStr(String nameServer) { + if (StringUtils.isEmpty(nameServer)) { + return null; + } + return nameServer.replaceAll(",", ";"); + } + + private static final String SQL = "sql:"; + + public static MessageSelector getMessageSelector(String expression) { + if (StringUtils.hasText(expression) && expression.startsWith(SQL)) { + return MessageSelector.bySql(expression.replaceFirst(SQL, "")); + } + return MessageSelector.byTag(expression); + } + +}