1
0
mirror of https://gitee.com/mirrors/Spring-Cloud-Alibaba.git synced 2021-06-26 13:25:11 +08:00

ans starter suport eureka/consule registry and add sms module

This commit is contained in:
得少
2019-01-09 18:38:08 +08:00
parent b475158352
commit ee2c696bdf
65 changed files with 3149 additions and 116 deletions

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.profile.DefaultProfile;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author pbting
*/
public abstract class AbstractSmsService implements ISmsService {
private ConcurrentHashMap<String, IAcsClient> acsClientConcurrentHashMap = new ConcurrentHashMap<>();
public IAcsClient getHangZhouRegionClientProfile(String accessKeyId,
String accessKeySecret) {
return acsClientConcurrentHashMap.computeIfAbsent(
getKey("cn-hangzhou", accessKeyId, accessKeySecret),
(iacsClient) -> new DefaultAcsClient(DefaultProfile
.getProfile("cn-hangzhou", accessKeyId, accessKeySecret)));
}
private String getKey(String regionId, String accessKeyId, String accessKeySecret) {
return regionId + ":" + accessKeyId + ":" + accessKeySecret;
}
}

View File

@@ -0,0 +1,115 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dysmsapi.model.v20170525.*;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
/**
* @author pbting
*/
public interface ISmsService {
/**
*
* @param accessKeyId
* @param secret
* @return IAcsClient
*/
IAcsClient getHangZhouRegionClientProfile(String accessKeyId, String secret);
/**
*
* @param sendSmsRequest
* @throws ServerException
* @throws ClientException
* @return SendSmsResponse
*/
SendSmsResponse sendSmsRequest(SendSmsRequest sendSmsRequest)
throws ServerException, ClientException;
/**
*
* @param sendBatchSmsRequest
* @throws ServerException
* @throws ClientException
* @return SendBatchSmsResponse
*/
SendBatchSmsResponse sendSmsBatchRequest(SendBatchSmsRequest sendBatchSmsRequest)
throws ServerException, ClientException;
/**
* 因为阿里云支持多个
* accessKeyId/accessKeySecret,当不想使用默认的配置accessKeyId/accessKeySecret时可以使用这个方法来支持额外
* 的accessKeyId/accessKeySecret 发送
* @param sendSmsRequest
* @param accessKeyId
* @param accessKeySecret
* @return
* @throws ServerException
* @throws ClientException
*/
SendSmsResponse sendSmsRequest(SendSmsRequest sendSmsRequest, String accessKeyId,
String accessKeySecret) throws ServerException, ClientException;
/**
*
* @param sendSmsRequest
* @param accessKeyId
* @param accessKeySecret
* @throws ServerException
* @throws ClientException
* @return SendBatchSmsResponse
*/
SendBatchSmsResponse sendSmsBatchRequest(SendBatchSmsRequest sendSmsRequest,
String accessKeyId, String accessKeySecret)
throws ServerException, ClientException;
/**
*
* @param smsReportMessageListener
* @return boolean
*/
boolean startSmsReportMessageListener(
SmsReportMessageListener smsReportMessageListener);
/**
*
* @param smsUpMessageListener
* @return boolean
*/
boolean startSmsUpMessageListener(SmsUpMessageListener smsUpMessageListener);
/**
*
* @param request
* @param accessKeyId
* @param accessKeySecret
* @return QuerySendDetailsResponse
*/
QuerySendDetailsResponse querySendDetails(QuerySendDetailsRequest request,
String accessKeyId, String accessKeySecret) throws ClientException;
/**
*
* @param request
* @return QuerySendDetailsResponse
*/
QuerySendDetailsResponse querySendDetails(QuerySendDetailsRequest request)
throws ClientException;
}

View File

@@ -0,0 +1,105 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.alicloud.context.sms.SmsConfigProperties;
import org.springframework.cloud.alicloud.sms.base.MessageListener;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author pbting
*/
@Component
public class SmsInitializerEventListener
implements ApplicationListener<ApplicationStartedEvent> {
private final AtomicBoolean isCalled = new AtomicBoolean(false);
private SmsConfigProperties msConfigProperties;
private ISmsService smsService;
public SmsInitializerEventListener(SmsConfigProperties msConfigProperties,
ISmsService smsService) {
this.msConfigProperties = msConfigProperties;
this.smsService = smsService;
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
if (!isCalled.compareAndSet(false, true)) {
return;
}
// 整个application context refreshed then do
// 可自助调整超时时间
System.setProperty("sun.net.client.defaultConnectTimeout",
msConfigProperties.getConnnectTimeout());
System.setProperty("sun.net.client.defaultReadTimeout",
msConfigProperties.getReadTimeout());
// 初始化acsClient,暂不支持region化
try {
DefaultProfile.addEndpoint("cn-hangzhou", "cn-hangzhou",
SmsConfigProperties.smsProduct, SmsConfigProperties.smsDomain);
Collection<MessageListener> messageListeners = event.getApplicationContext()
.getBeansOfType(MessageListener.class).values();
if (messageListeners.isEmpty()) {
return;
}
for (MessageListener messageListener : messageListeners) {
if (SmsReportMessageListener.class.isInstance(messageListener)) {
if (msConfigProperties.getReportQueueName() != null
&& msConfigProperties.getReportQueueName().trim()
.length() > 0) {
smsService.startSmsReportMessageListener(
(SmsReportMessageListener) messageListener);
continue;
}
throw new IllegalArgumentException("the SmsReport queue name for "
+ messageListener.getClass().getCanonicalName()
+ " must be set.");
}
if (SmsUpMessageListener.class.isInstance(messageListener)) {
if (msConfigProperties.getUpQueueName() != null
&& msConfigProperties.getUpQueueName().trim().length() > 0) {
smsService.startSmsUpMessageListener(
(SmsUpMessageListener) messageListener);
continue;
}
throw new IllegalArgumentException("the SmsUp queue name for "
+ messageListener.getClass().getCanonicalName()
+ " must be set.");
}
}
}
catch (ClientException e) {
throw new RuntimeException(
"initialize sms profile end point cause an exception");
}
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
import org.springframework.cloud.alicloud.sms.base.MessageListener;
/**
* @author pbting
*/
public interface SmsMessageListener extends MessageListener {
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
/**
* @author pbting
*/
public interface SmsReportMessageListener extends SmsMessageListener {
}

View File

@@ -0,0 +1,187 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
import com.aliyuncs.dysmsapi.model.v20170525.*;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.alicloud.context.sms.SmsConfigProperties;
import org.springframework.cloud.alicloud.sms.base.DefaultAlicomMessagePuller;
import org.springframework.cloud.alicloud.sms.endpoint.EndpointManager;
import org.springframework.cloud.alicloud.sms.endpoint.ReceiveMessageEntity;
import java.text.ParseException;
/**
* @author pbting
*/
public final class SmsServiceImpl extends AbstractSmsService {
private static final Log log = LogFactory.getLog(SmsServiceImpl.class);
/**
* will expose user to call this method send sms message
* @param sendSmsRequest
* @return
*/
private SmsConfigProperties smsConfigProperties;
public SmsServiceImpl(SmsConfigProperties smsConfigProperties) {
this.smsConfigProperties = smsConfigProperties;
}
public SendSmsResponse sendSmsRequest(SendSmsRequest sendSmsRequest)
throws ClientException {
return sendSmsRequest(sendSmsRequest, smsConfigProperties.getAccessKeyId(),
smsConfigProperties.getAccessKeySecret());
}
/**
* 因为阿里云支持多个
* accessKeyId/accessKeySecret,当不想使用默认的配置accessKeyId/accessKeySecret时可以使用这个方法来支持额外
* 的accessKeyId/accessKeySecret 发送
* @param sendSmsRequest
* @param accessKeyId
* @param accessKeySecret
* @throws ServerException
* @throws ClientException
* @return SendSmsResponse
*/
public SendSmsResponse sendSmsRequest(SendSmsRequest sendSmsRequest,
String accessKeyId, String accessKeySecret)
throws ServerException, ClientException {
EndpointManager.addSendSmsRequest(sendSmsRequest);
// hint 此处可能会抛出异常注意catch
return getHangZhouRegionClientProfile(accessKeyId, accessKeySecret)
.getAcsResponse(sendSmsRequest);
}
/**
*
* @param smsReportMessageListener
* @return boolean
*/
public boolean startSmsReportMessageListener(
SmsReportMessageListener smsReportMessageListener) {
String messageType = "SmsReport";// 短信回执SmsReport短信上行SmsUp
String queueName = smsConfigProperties.getReportQueueName();
return startReceiveMsg(messageType, queueName, smsReportMessageListener);
}
/**
*
* @param smsUpMessageListener
* @return boolean
*/
public boolean startSmsUpMessageListener(SmsUpMessageListener smsUpMessageListener) {
String messageType = "SmsUp";// 短信回执SmsReport短信上行SmsUp
String queueName = smsConfigProperties.getUpQueueName();
return startReceiveMsg(messageType, queueName, smsUpMessageListener);
}
/**
*
* @param messageType
* @param queueName
* @param messageListener
* @return boolean
*/
private boolean startReceiveMsg(String messageType, String queueName,
SmsMessageListener messageListener) {
String accessKeyId = smsConfigProperties.getAccessKeyId();
String accessKeySecret = smsConfigProperties.getAccessKeySecret();
boolean result = true;
try {
new DefaultAlicomMessagePuller().startReceiveMsg(accessKeyId, accessKeySecret,
messageType, queueName, messageListener);
EndpointManager.addReceiveMessageEntity(
new ReceiveMessageEntity(messageType, queueName, messageListener));
}
catch (ClientException e) {
log.error("start sms report message listener cause an exception", e);
result = false;
}
catch (ParseException e) {
log.error("start sms report message listener cause an exception", e);
result = false;
}
return result;
}
/**
*
* @param sendBatchSmsRequest
* @throws ServerException
* @throws ClientException
* @return SendBatchSmsResponse
*/
@Override
public SendBatchSmsResponse sendSmsBatchRequest(
SendBatchSmsRequest sendBatchSmsRequest)
throws ServerException, ClientException {
return sendSmsBatchRequest(sendBatchSmsRequest,
smsConfigProperties.getAccessKeyId(),
smsConfigProperties.getAccessKeySecret());
}
/**
*
* @param sendBatchSmsRequest
* @param accessKeyId
* @param accessKeySecret
* @throws ClientException
* @return SendBatchSmsResponse
*/
@Override
public SendBatchSmsResponse sendSmsBatchRequest(
SendBatchSmsRequest sendBatchSmsRequest, String accessKeyId,
String accessKeySecret) throws ClientException {
EndpointManager.addSendBatchSmsRequest(sendBatchSmsRequest);
return getHangZhouRegionClientProfile(accessKeyId, accessKeySecret)
.getAcsResponse(sendBatchSmsRequest);
}
/**
*
* @param request
* @param accessKeyId
* @param accessKeySecret
* @throws ClientException
* @return QuerySendDetailsResponse
*/
@Override
public QuerySendDetailsResponse querySendDetails(QuerySendDetailsRequest request,
String accessKeyId, String accessKeySecret) throws ClientException {
return getHangZhouRegionClientProfile(accessKeyId, accessKeySecret)
.getAcsResponse(request);
}
/**
*
* @param request
* @throws ClientException
* @return QuerySendDetailsResponse
*/
@Override
public QuerySendDetailsResponse querySendDetails(QuerySendDetailsRequest request)
throws ClientException {
return querySendDetails(request, smsConfigProperties.getAccessKeyId(),
smsConfigProperties.getAccessKeySecret());
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms;
/**
* @author pbting
*/
public interface SmsUpMessageListener extends SmsMessageListener {
}

View File

@@ -0,0 +1,429 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.model.Message;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* 阿里通信官方消息默认拉取工具类
*/
public class DefaultAlicomMessagePuller {
private Log logger = LogFactory.getLog(DefaultAlicomMessagePuller.class);
private String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";// 阿里通信消息的endpoint,固定。
private String endpointNameForPop = "cn-hangzhou";
private String regionIdForPop = "cn-hangzhou";
private String domainForPop = "dybaseapi.aliyuncs.com";
private TokenGetterForAlicom tokenGetter;
private MessageListener messageListener;
private boolean isRunning = false;
private Integer pullMsgThreadSize = 1;
private boolean debugLogOpen = false;
private Integer sleepSecondWhenNoData = 30;
public void openDebugLog(boolean debugLogOpen) {
this.debugLogOpen = debugLogOpen;
}
public Integer getSleepSecondWhenNoData() {
return sleepSecondWhenNoData;
}
public void setSleepSecondWhenNoData(Integer sleepSecondWhenNoData) {
this.sleepSecondWhenNoData = sleepSecondWhenNoData;
}
public Integer getPullMsgThreadSize() {
return pullMsgThreadSize;
}
public void setPullMsgThreadSize(Integer pullMsgThreadSize) {
if (pullMsgThreadSize != null && pullMsgThreadSize > 1) {
this.pullMsgThreadSize = pullMsgThreadSize;
}
}
private ExecutorService executorService;
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
public boolean setPolling(String queueName) {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling(String queueName) {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
if (debugLogOpen) {
logger.info("PullMessageTask_WakeUp:Everyone WakeUp and Work!");
}
}
}
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean running) {
isRunning = running;
}
private class PullMessageTask implements Runnable {
private String messageType;
private String queueName;
@Override
public void run() {
boolean polling = false;
while (isRunning) {
try {
synchronized (lockObj) {
Boolean p = sPollingMap.get(queueName);
if (p != null && p) {
try {
if (debugLogOpen) {
logger.info("PullMessageTask_sleep:"
+ Thread.currentThread().getName()
+ " Have a nice sleep!");
}
polling = false;
lockObj.wait();
}
catch (InterruptedException e) {
if (debugLogOpen) {
logger.info("PullMessageTask_Interrupted!"
+ Thread.currentThread().getName()
+ " QueueName is " + queueName);
}
continue;
}
}
}
TokenForAlicom tokenObject = tokenGetter.getTokenByMessageType(
messageType, queueName, mnsAccountEndpoint);
CloudQueue queue = tokenObject.getQueue();
Message popMsg = null;
if (!polling) {
popMsg = queue.popMessage();
if (debugLogOpen) {
SimpleDateFormat format = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
logger.info("PullMessageTask_popMessage:"
+ Thread.currentThread().getName() + "-popDone at "
+ "," + format.format(new Date()) + " msgSize="
+ (popMsg == null ? 0 : popMsg.getMessageId()));
}
if (popMsg == null) {
polling = true;
continue;
}
}
else {
if (setPolling(queueName)) {
if (debugLogOpen) {
logger.info("PullMessageTask_setPolling:"
+ Thread.currentThread().getName() + " Polling!");
}
}
else {
continue;
}
do {
if (debugLogOpen) {
logger.info("PullMessageTask_Keep_Polling"
+ Thread.currentThread().getName()
+ "KEEP Polling!");
}
try {
popMsg = queue.popMessage(sleepSecondWhenNoData);
}
catch (ClientException e) {
if (debugLogOpen) {
logger.info(
"PullMessageTask_Pop_Message:ClientException Refresh accessKey"
+ e);
}
tokenObject = tokenGetter.getTokenByMessageType(
messageType, queueName, mnsAccountEndpoint);
queue = tokenObject.getQueue();
}
catch (ServiceException e) {
if (debugLogOpen) {
logger.info(
"PullMessageTask_Pop_Message:ServiceException Refresh accessKey"
+ e);
}
tokenObject = tokenGetter.getTokenByMessageType(
messageType, queueName, mnsAccountEndpoint);
queue = tokenObject.getQueue();
}
catch (Exception e) {
if (debugLogOpen) {
logger.info(
"PullMessageTask_Pop_Message:Exception Happened when polling popMessage: "
+ e);
}
}
}
while (popMsg == null && isRunning);
clearPolling(queueName);
}
boolean dealResult = messageListener.dealMessage(popMsg);
if (dealResult) {
// remember to delete message when consume message successfully.
if (debugLogOpen) {
logger.info("PullMessageTask_Deal_Message:"
+ Thread.currentThread().getName() + "deleteMessage "
+ popMsg.getMessageId());
}
queue.deleteMessage(popMsg.getReceiptHandle());
}
}
catch (ClientException e) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName, e);
break;
}
catch (ServiceException e) {
if (e.getErrorCode().equals("AccessDenied")) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName
+ ",please check messageType and queueName", e);
}
else {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName, e);
}
break;
}
catch (com.aliyuncs.exceptions.ClientException e) {
if (e.getErrCode().equals("InvalidAccessKeyId.NotFound")) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName
+ ",please check AccessKeyId", e);
}
if (e.getErrCode().equals("SignatureDoesNotMatch")) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName
+ ",please check AccessKeySecret", e);
}
else {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName, e);
}
break;
}
catch (Exception e) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName, e);
try {
Thread.sleep(sleepSecondWhenNoData);
}
catch (InterruptedException e1) {
logger.error("PullMessageTask_execute_error,messageType:"
+ messageType + ",queueName:" + queueName, e);
}
}
}
}
}
/**
* @param accessKeyId accessKeyId
* @param accessKeySecret accessKeySecret
* @param messageType 消息类型
* @param queueName 队列名称
* @param messageListener 回调的listener,用户自己实现
* @throws com.aliyuncs.exceptions.ClientException
* @throws ParseException
*/
public void startReceiveMsg(String accessKeyId, String accessKeySecret,
String messageType, String queueName, MessageListener messageListener)
throws com.aliyuncs.exceptions.ClientException, ParseException {
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
endpointNameForPop, regionIdForPop, domainForPop, null);
this.messageListener = messageListener;
isRunning = true;
PullMessageTask task = new PullMessageTask();
task.messageType = messageType;
task.queueName = queueName;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
if (executorService == null) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
pullMsgThreadSize,
new BasicThreadFactory.Builder()
.namingPattern(
"PullMessageTask-" + messageType + "-thread-pool-%d")
.daemon(true).build());
executorService = scheduledExecutorService;
}
for (int i = 0; i < pullMsgThreadSize; i++) {
executorService.execute(task);
}
}
/**
* @param accessKeyId accessKeyId
* @param accessKeySecret accessKeySecret
* @param messageType 消息类型
* @param queueName 队列名称
* @param messageListener 回调的listener,用户自己实现
* @throws com.aliyuncs.exceptions.ClientException
* @throws ParseException
*/
public void startReceiveMsgForVPC(String accessKeyId, String accessKeySecret,
String messageType, String queueName, String regionIdForPop,
String endpointNameForPop, String domainForPop, String mnsAccountEndpoint,
MessageListener messageListener)
throws com.aliyuncs.exceptions.ClientException, ParseException {
this.mnsAccountEndpoint = mnsAccountEndpoint;
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
endpointNameForPop, regionIdForPop, domainForPop, null);
this.messageListener = messageListener;
isRunning = true;
PullMessageTask task = new PullMessageTask();
task.messageType = messageType;
task.queueName = queueName;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
if (executorService == null) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
pullMsgThreadSize,
new BasicThreadFactory.Builder()
.namingPattern(
"PullMessageTask-" + messageType + "-thread-pool-%d")
.daemon(true).build());
executorService = scheduledExecutorService;
}
for (int i = 0; i < pullMsgThreadSize; i++) {
executorService.execute(task);
}
}
/**
* 虚商用户定制接收消息方法
* @param accessKeyId accessKeyId
* @param accessKeySecret accessKeySecret
* @param ownerId 实际的ownerId
* @param messageType 消息类型
* @param queueName 队列名称
* @param messageListener 回调listener
* @throws com.aliyuncs.exceptions.ClientException
* @throws ParseException
*/
public void startReceiveMsgForPartnerUser(String accessKeyId, String accessKeySecret,
Long ownerId, String messageType, String queueName,
MessageListener messageListener)
throws com.aliyuncs.exceptions.ClientException, ParseException {
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
endpointNameForPop, regionIdForPop, domainForPop, ownerId);
this.messageListener = messageListener;
isRunning = true;
PullMessageTask task = new PullMessageTask();
task.messageType = messageType;
task.queueName = queueName;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
if (executorService == null) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
pullMsgThreadSize,
new BasicThreadFactory.Builder()
.namingPattern(
"PullMessageTask-" + messageType + "-thread-pool-%d")
.daemon(true).build());
executorService = scheduledExecutorService;
}
for (int i = 0; i < pullMsgThreadSize; i++) {
executorService.execute(task);
}
}
public void stop() {
isRunning = false;
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyun.mns.model.Message;
public interface MessageListener {
boolean dealMessage(Message message);
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyuncs.RpcAcsRequest;
public class QueryTokenForMnsQueueRequest
extends RpcAcsRequest<QueryTokenForMnsQueueResponse> {
private String resourceOwnerAccount;
private String messageType;
private Long resourceOwnerId;
private Long ownerId;
public QueryTokenForMnsQueueRequest() {
super("Dybaseapi", "2017-05-25", "QueryTokenForMnsQueue");
}
public String getResourceOwnerAccount() {
return this.resourceOwnerAccount;
}
public void setResourceOwnerAccount(String resourceOwnerAccount) {
this.resourceOwnerAccount = resourceOwnerAccount;
if (resourceOwnerAccount != null) {
this.putQueryParameter("ResourceOwnerAccount", resourceOwnerAccount);
}
}
public String getMessageType() {
return this.messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
if (messageType != null) {
this.putQueryParameter("MessageType", messageType);
}
}
public Long getResourceOwnerId() {
return this.resourceOwnerId;
}
public void setResourceOwnerId(Long resourceOwnerId) {
this.resourceOwnerId = resourceOwnerId;
if (resourceOwnerId != null) {
this.putQueryParameter("ResourceOwnerId", resourceOwnerId.toString());
}
}
public Long getOwnerId() {
return this.ownerId;
}
public void setOwnerId(Long ownerId) {
this.ownerId = ownerId;
if (ownerId != null) {
this.putQueryParameter("OwnerId", ownerId.toString());
}
}
public Class<QueryTokenForMnsQueueResponse> getResponseClass() {
return QueryTokenForMnsQueueResponse.class;
}
}

View File

@@ -0,0 +1,117 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyuncs.AcsResponse;
import com.aliyuncs.transform.UnmarshallerContext;
public class QueryTokenForMnsQueueResponse extends AcsResponse {
private String requestId;
private String code;
private String message;
private QueryTokenForMnsQueueResponse.MessageTokenDTO messageTokenDTO;
public QueryTokenForMnsQueueResponse() {
}
public String getRequestId() {
return this.requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getCode() {
return this.code;
}
public void setCode(String code) {
this.code = code;
}
public String getMessage() {
return this.message;
}
public void setMessage(String message) {
this.message = message;
}
public QueryTokenForMnsQueueResponse.MessageTokenDTO getMessageTokenDTO() {
return this.messageTokenDTO;
}
public void setMessageTokenDTO(
QueryTokenForMnsQueueResponse.MessageTokenDTO messageTokenDTO) {
this.messageTokenDTO = messageTokenDTO;
}
public QueryTokenForMnsQueueResponse getInstance(UnmarshallerContext context) {
return QueryTokenForMnsQueueResponseUnmarshaller.unmarshall(this, context);
}
public static class MessageTokenDTO {
private String accessKeyId;
private String accessKeySecret;
private String securityToken;
private String createTime;
private String expireTime;
public MessageTokenDTO() {
}
public String getAccessKeyId() {
return this.accessKeyId;
}
public void setAccessKeyId(String accessKeyId) {
this.accessKeyId = accessKeyId;
}
public String getAccessKeySecret() {
return this.accessKeySecret;
}
public void setAccessKeySecret(String accessKeySecret) {
this.accessKeySecret = accessKeySecret;
}
public String getSecurityToken() {
return this.securityToken;
}
public void setSecurityToken(String securityToken) {
this.securityToken = securityToken;
}
public String getCreateTime() {
return this.createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public String getExpireTime() {
return this.expireTime;
}
public void setExpireTime(String expireTime) {
this.expireTime = expireTime;
}
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyuncs.transform.UnmarshallerContext;
public class QueryTokenForMnsQueueResponseUnmarshaller {
public QueryTokenForMnsQueueResponseUnmarshaller() {
}
public static QueryTokenForMnsQueueResponse unmarshall(
QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse,
UnmarshallerContext context) {
queryTokenForMnsQueueResponse.setRequestId(
context.stringValue("QueryTokenForMnsQueueResponse.RequestId"));
queryTokenForMnsQueueResponse
.setCode(context.stringValue("QueryTokenForMnsQueueResponse.Code"));
queryTokenForMnsQueueResponse
.setMessage(context.stringValue("QueryTokenForMnsQueueResponse.Message"));
QueryTokenForMnsQueueResponse.MessageTokenDTO messageTokenDTO = new QueryTokenForMnsQueueResponse.MessageTokenDTO();
messageTokenDTO.setAccessKeyId(context.stringValue(
"QueryTokenForMnsQueueResponse.MessageTokenDTO.AccessKeyId"));
messageTokenDTO.setAccessKeySecret(context.stringValue(
"QueryTokenForMnsQueueResponse.MessageTokenDTO.AccessKeySecret"));
messageTokenDTO.setSecurityToken(context.stringValue(
"QueryTokenForMnsQueueResponse.MessageTokenDTO.SecurityToken"));
messageTokenDTO.setCreateTime(context
.stringValue("QueryTokenForMnsQueueResponse.MessageTokenDTO.CreateTime"));
messageTokenDTO.setExpireTime(context
.stringValue("QueryTokenForMnsQueueResponse.MessageTokenDTO.ExpireTime"));
queryTokenForMnsQueueResponse.setMessageTokenDTO(messageTokenDTO);
return queryTokenForMnsQueueResponse;
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
/**
* 用于接收云通信消息的临时token
*
*/
public class TokenForAlicom {
private String messageType;
private String token;
private Long expireTime;
private String tempAccessKeyId;
private String tempAccessKeySecret;
private MNSClient client;
private CloudQueue queue;
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public Long getExpireTime() {
return expireTime;
}
public void setExpireTime(Long expireTime) {
this.expireTime = expireTime;
}
public String getTempAccessKeyId() {
return tempAccessKeyId;
}
public void setTempAccessKeyId(String tempAccessKeyId) {
this.tempAccessKeyId = tempAccessKeyId;
}
public String getTempAccessKeySecret() {
return tempAccessKeySecret;
}
public void setTempAccessKeySecret(String tempAccessKeySecret) {
this.tempAccessKeySecret = tempAccessKeySecret;
}
public MNSClient getClient() {
return client;
}
public void setClient(MNSClient client) {
this.client = client;
}
public CloudQueue getQueue() {
return queue;
}
public void setQueue(CloudQueue queue) {
this.queue = queue;
}
public void closeClient() {
if (client != null) {
this.client.close();
}
}
}

View File

@@ -0,0 +1,142 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.base;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.http.FormatType;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.http.ProtocolType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 获取接收云通信消息的临时token
*
*/
public class TokenGetterForAlicom {
private Log logger = LogFactory.getLog(TokenGetterForAlicom.class);
private String accessKeyId;
private String accessKeySecret;
private String endpointNameForPop;
private String regionIdForPop;
private String domainForPop;
private IAcsClient iAcsClient;
private Long ownerId;
private final static String productName = "Dybaseapi";
private long bufferTime = 1000 * 60 * 2;// 过期时间小于2分钟则重新获取防止服务器时间误差
private final Object lock = new Object();
private ConcurrentMap<String, TokenForAlicom> tokenMap = new ConcurrentHashMap<String, TokenForAlicom>();
public TokenGetterForAlicom(String accessKeyId, String accessKeySecret,
String endpointNameForPop, String regionIdForPop, String domainForPop,
Long ownerId) throws ClientException {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
this.endpointNameForPop = endpointNameForPop;
this.regionIdForPop = regionIdForPop;
this.domainForPop = domainForPop;
this.ownerId = ownerId;
init();
}
private void init() throws ClientException {
DefaultProfile.addEndpoint(endpointNameForPop, regionIdForPop, productName,
domainForPop);
IClientProfile profile = DefaultProfile.getProfile(regionIdForPop, accessKeyId,
accessKeySecret);
profile.getHttpClientConfig().setCompatibleMode(true);
iAcsClient = new DefaultAcsClient(profile);
}
private TokenForAlicom getTokenFromRemote(String messageType)
throws ServerException, ClientException, ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
df.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest();
request.setAcceptFormat(FormatType.JSON);
request.setMessageType(messageType);
request.setOwnerId(ownerId);
request.setProtocol(ProtocolType.HTTPS);
request.setMethod(MethodType.POST);
QueryTokenForMnsQueueResponse response = iAcsClient.getAcsResponse(request);
String resultCode = response.getCode();
if (resultCode != null && "OK".equals(resultCode)) {
QueryTokenForMnsQueueResponse.MessageTokenDTO dto = response
.getMessageTokenDTO();
TokenForAlicom token = new TokenForAlicom();
String timeStr = dto.getExpireTime();
token.setMessageType(messageType);
token.setExpireTime(df.parse(timeStr).getTime());
token.setToken(dto.getSecurityToken());
token.setTempAccessKeyId(dto.getAccessKeyId());
token.setTempAccessKeySecret(dto.getAccessKeySecret());
return token;
}
else {
logger.error("getTokenFromRemote_error,messageType:" + messageType + ",code:"
+ response.getCode() + ",message:" + response.getMessage());
throw new ServerException(response.getCode(), response.getMessage());
}
}
public TokenForAlicom getTokenByMessageType(String messageType, String queueName,
String mnsAccountEndpoint)
throws ServerException, ClientException, ParseException {
TokenForAlicom token = tokenMap.get(messageType);
Long now = System.currentTimeMillis();
if (token == null || (token.getExpireTime() - now) < bufferTime) {// 过期时间小于2分钟则重新获取防止服务器时间误差
synchronized (lock) {
token = tokenMap.get(messageType);
if (token == null || (token.getExpireTime() - now) < bufferTime) {
TokenForAlicom oldToken = null;
if (token != null) {
oldToken = token;
}
token = getTokenFromRemote(messageType);
// 因为换token时需要重建client和关闭老的client所以创建client的代码和创建token放在一起
CloudAccount account = new CloudAccount(token.getTempAccessKeyId(),
token.getTempAccessKeySecret(), mnsAccountEndpoint,
token.getToken());
// logger.warn("ak:"+token.getTempAccessKey());
// logger.warn("token:"+token.getToken());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(queueName);
token.setClient(client);
token.setQueue(queue);
tokenMap.put(messageType, token);
if (oldToken != null) {
oldToken.closeClient();
}
}
}
}
return token;
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.alicloud.context.sms.SmsConfigProperties;
import org.springframework.cloud.alicloud.sms.ISmsService;
import org.springframework.cloud.alicloud.sms.SmsInitializerEventListener;
import org.springframework.cloud.alicloud.sms.SmsServiceImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
/**
* @author pbting
*/
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(value = SendSmsRequest.class)
@ConditionalOnProperty(value = "spring.cloud.alicloud.sms.enable", matchIfMissing = true)
public class SmsAutoConfiguration {
@Bean
public SmsServiceImpl smsService(SmsConfigProperties smsConfigProperties) {
return new SmsServiceImpl(smsConfigProperties);
}
@Bean
public SmsInitializerEventListener smsInitializePostListener(
SmsConfigProperties msConfigProperties, ISmsService smsService) {
return new SmsInitializerEventListener(msConfigProperties, smsService);
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.endpoint;
import com.aliyuncs.dysmsapi.model.v20170525.SendBatchSmsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
/**
*
*/
public final class EndpointManager {
private final static int BACKLOG_SIZE = 20;
private final static ReentrantLock SEND_REENTRANT_LOCK = new ReentrantLock(true);
private final static ReentrantLock SEND_BATCH_REENTRANT_LOCK = new ReentrantLock(
true);
private final static LinkedBlockingQueue<SendSmsRequest> SEND_SMS_REQUESTS = new LinkedBlockingQueue(
BACKLOG_SIZE);
private final static LinkedBlockingQueue<SendBatchSmsRequest> SEND_BATCH_SMS_REQUESTS = new LinkedBlockingQueue(
BACKLOG_SIZE);
private final static LinkedBlockingQueue<ReceiveMessageEntity> RECEIVE_MESSAGE_ENTITIES = new LinkedBlockingQueue(
BACKLOG_SIZE);
public static void addSendSmsRequest(SendSmsRequest sendSmsRequest) {
if (SEND_SMS_REQUESTS.offer(sendSmsRequest)) {
return;
}
try {
SEND_REENTRANT_LOCK.lock();
SEND_SMS_REQUESTS.poll();
SEND_SMS_REQUESTS.offer(sendSmsRequest);
}
finally {
SEND_REENTRANT_LOCK.unlock();
}
}
public static void addSendBatchSmsRequest(SendBatchSmsRequest sendBatchSmsRequest) {
if (SEND_BATCH_SMS_REQUESTS.offer(sendBatchSmsRequest)) {
return;
}
try {
SEND_BATCH_REENTRANT_LOCK.lock();
SEND_BATCH_SMS_REQUESTS.poll();
SEND_BATCH_SMS_REQUESTS.offer(sendBatchSmsRequest);
}
finally {
SEND_BATCH_REENTRANT_LOCK.unlock();
}
}
public static void addReceiveMessageEntity(
ReceiveMessageEntity receiveMessageEntity) {
if (RECEIVE_MESSAGE_ENTITIES.offer(receiveMessageEntity)) {
return;
}
RECEIVE_MESSAGE_ENTITIES.poll();
RECEIVE_MESSAGE_ENTITIES.offer(receiveMessageEntity);
}
public static Map<String, Object> getSmsEndpointMessage() {
List<SendSmsRequest> sendSmsRequests = new LinkedList<>();
List<SendBatchSmsRequest> sendBatchSmsRequests = new LinkedList<>();
List<ReceiveMessageEntity> receiveMessageEntities = new LinkedList<>();
try {
SEND_REENTRANT_LOCK.lock();
SEND_BATCH_REENTRANT_LOCK.lock();
sendSmsRequests.addAll(SEND_SMS_REQUESTS);
sendBatchSmsRequests.addAll(SEND_BATCH_SMS_REQUESTS);
}
finally {
SEND_REENTRANT_LOCK.unlock();
SEND_BATCH_REENTRANT_LOCK.unlock();
}
receiveMessageEntities.addAll(RECEIVE_MESSAGE_ENTITIES);
Map<String, Object> endpointMessages = new HashMap<>();
endpointMessages.put("send-sms-request", sendSmsRequests);
endpointMessages.put("send-batch-sms-request", sendBatchSmsRequests);
endpointMessages.put("message-listener", receiveMessageEntities);
return endpointMessages;
}
}

View File

@@ -0,0 +1,60 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.endpoint;
import org.springframework.cloud.alicloud.sms.base.MessageListener;
import java.io.Serializable;
/**
* @author pbting
*/
public class ReceiveMessageEntity implements Serializable {
private String messageType;
private String queueName;
private MessageListener messageListener;
public ReceiveMessageEntity(String messageType, String queueName,
MessageListener messageListener) {
this.messageType = messageType;
this.queueName = queueName;
this.messageListener = messageListener;
}
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public MessageListener getMessageListener() {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.endpoint;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import java.util.Map;
@Endpoint(id = "sms-info")
public class SmsEndpoint {
@ReadOperation
public Map<String, Object> invoke() {
return EndpointManager.getSmsEndpointMessage();
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (C) 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alicloud.sms.endpoint;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.context.annotation.Bean;
@ConditionalOnWebApplication
@ConditionalOnClass(Endpoint.class)
public class SmsEndpointAutoConfiguration {
@Bean
public SmsEndpoint smsEndpoint() {
return new SmsEndpoint();
}
}