mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
polish #761 update pkg name & maven coordinate for Greenwich
This commit is contained in:
@@ -3,20 +3,19 @@
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba</artifactId>
|
||||
<version>0.9.1.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-sms</artifactId>
|
||||
<name>Spring Cloud Alibaba Cloud SMS</name>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-alicloud-context</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@@ -13,10 +13,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
import com.aliyuncs.IAcsClient;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.*;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsResponse;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendBatchSmsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendBatchSmsResponse;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.exceptions.ServerException;
|
||||
|
@@ -13,19 +13,21 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.profile.DefaultProfile;
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
||||
import org.springframework.cloud.alicloud.context.sms.SmsProperties;
|
||||
import org.springframework.cloud.alicloud.sms.base.MessageListener;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.alibaba.alicloud.context.sms.SmsProperties;
|
||||
import com.alibaba.alicloud.sms.base.MessageListener;
|
||||
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.profile.DefaultProfile;
|
||||
|
||||
/**
|
||||
* @author pbting
|
||||
*/
|
@@ -13,9 +13,9 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
import org.springframework.cloud.alicloud.sms.base.MessageListener;
|
||||
import com.alibaba.alicloud.sms.base.MessageListener;
|
||||
|
||||
/**
|
||||
* @author pbting
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
/**
|
||||
* @author pbting
|
@@ -13,17 +13,18 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
import java.text.ParseException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.alicloud.context.AliCloudProperties;
|
||||
import org.springframework.cloud.alicloud.context.sms.SmsProperties;
|
||||
import org.springframework.cloud.alicloud.sms.base.DefaultAlicomMessagePuller;
|
||||
import org.springframework.cloud.alicloud.sms.endpoint.EndpointManager;
|
||||
import org.springframework.cloud.alicloud.sms.endpoint.ReceiveMessageEntity;
|
||||
|
||||
import com.alibaba.alicloud.context.AliCloudProperties;
|
||||
import com.alibaba.alicloud.context.sms.SmsProperties;
|
||||
import com.alibaba.alicloud.sms.base.DefaultAlicomMessagePuller;
|
||||
import com.alibaba.alicloud.sms.endpoint.EndpointManager;
|
||||
import com.alibaba.alicloud.sms.endpoint.ReceiveMessageEntity;
|
||||
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsResponse;
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms;
|
||||
package com.alibaba.alicloud.sms;
|
||||
|
||||
/**
|
||||
* @author pbting
|
@@ -1,431 +1,431 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.common.ClientException;
|
||||
import com.aliyun.mns.common.ServiceException;
|
||||
import com.aliyun.mns.model.Message;
|
||||
|
||||
/**
|
||||
* 阿里通信官方消息默认拉取工具类
|
||||
*/
|
||||
public class DefaultAlicomMessagePuller {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(DefaultAlicomMessagePuller.class);
|
||||
|
||||
private String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";// 阿里通信消息的endpoint,固定。
|
||||
private String endpointNameForPop = "cn-hangzhou";
|
||||
private String regionIdForPop = "cn-hangzhou";
|
||||
private String domainForPop = "dybaseapi.aliyuncs.com";
|
||||
private TokenGetterForAlicom tokenGetter;
|
||||
private MessageListener messageListener;
|
||||
private boolean isRunning = false;
|
||||
private Integer pullMsgThreadSize = 1;
|
||||
private boolean debugLogOpen = false;
|
||||
private Integer sleepSecondWhenNoData = 30;
|
||||
|
||||
public void openDebugLog(boolean debugLogOpen) {
|
||||
this.debugLogOpen = debugLogOpen;
|
||||
}
|
||||
|
||||
public Integer getSleepSecondWhenNoData() {
|
||||
return sleepSecondWhenNoData;
|
||||
}
|
||||
|
||||
public void setSleepSecondWhenNoData(Integer sleepSecondWhenNoData) {
|
||||
this.sleepSecondWhenNoData = sleepSecondWhenNoData;
|
||||
}
|
||||
|
||||
public Integer getPullMsgThreadSize() {
|
||||
return pullMsgThreadSize;
|
||||
}
|
||||
|
||||
public void setPullMsgThreadSize(Integer pullMsgThreadSize) {
|
||||
if (pullMsgThreadSize != null && pullMsgThreadSize > 1) {
|
||||
this.pullMsgThreadSize = pullMsgThreadSize;
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
public ExecutorService getExecutorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
public void setExecutorService(ExecutorService executorService) {
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
protected static final Map<String, Object> S_LOCK_OBJ_MAP = new HashMap<>();
|
||||
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<>();
|
||||
protected Object lockObj;
|
||||
|
||||
public boolean setPolling(String queueName) {
|
||||
synchronized (lockObj) {
|
||||
Boolean ret = sPollingMap.get(queueName);
|
||||
if (ret == null || !ret) {
|
||||
sPollingMap.put(queueName, true);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void clearPolling(String queueName) {
|
||||
synchronized (lockObj) {
|
||||
sPollingMap.put(queueName, false);
|
||||
lockObj.notifyAll();
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_WakeUp:Everyone WakeUp and Work!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return isRunning;
|
||||
}
|
||||
|
||||
public void setRunning(boolean running) {
|
||||
isRunning = running;
|
||||
}
|
||||
|
||||
private class PullMessageTask implements Runnable {
|
||||
private String messageType;
|
||||
private String queueName;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
boolean polling = false;
|
||||
while (isRunning) {
|
||||
try {
|
||||
synchronized (lockObj) {
|
||||
Boolean p = sPollingMap.get(queueName);
|
||||
if (p != null && p) {
|
||||
try {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_sleep:"
|
||||
+ Thread.currentThread().getName()
|
||||
+ " Have a nice sleep!");
|
||||
}
|
||||
polling = false;
|
||||
lockObj.wait();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Interrupted!"
|
||||
+ Thread.currentThread().getName()
|
||||
+ " QueueName is " + queueName);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TokenForAlicom tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
CloudQueue queue = tokenObject.getQueue();
|
||||
Message popMsg = null;
|
||||
if (!polling) {
|
||||
popMsg = queue.popMessage();
|
||||
if (debugLogOpen) {
|
||||
SimpleDateFormat format = new SimpleDateFormat(
|
||||
"yyyy-MM-dd HH:mm:ss");
|
||||
log.info("PullMessageTask_popMessage:"
|
||||
+ Thread.currentThread().getName() + "-popDone at "
|
||||
+ "," + format.format(new Date()) + " msgSize="
|
||||
+ (popMsg == null ? 0 : popMsg.getMessageId()));
|
||||
}
|
||||
if (popMsg == null) {
|
||||
polling = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (setPolling(queueName)) {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_setPolling:"
|
||||
+ Thread.currentThread().getName() + " Polling!");
|
||||
}
|
||||
}
|
||||
else {
|
||||
continue;
|
||||
}
|
||||
do {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Keep_Polling"
|
||||
+ Thread.currentThread().getName()
|
||||
+ "KEEP Polling!");
|
||||
}
|
||||
try {
|
||||
popMsg = queue.popMessage(sleepSecondWhenNoData);
|
||||
}
|
||||
catch (ClientException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:ClientException Refresh accessKey"
|
||||
+ e);
|
||||
}
|
||||
tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
queue = tokenObject.getQueue();
|
||||
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:ServiceException Refresh accessKey"
|
||||
+ e);
|
||||
}
|
||||
tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
queue = tokenObject.getQueue();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:Exception Happened when polling popMessage: "
|
||||
+ e);
|
||||
}
|
||||
}
|
||||
}
|
||||
while (popMsg == null && isRunning);
|
||||
clearPolling(queueName);
|
||||
}
|
||||
boolean dealResult = messageListener.dealMessage(popMsg);
|
||||
if (dealResult) {
|
||||
// remember to delete message when consume message successfully.
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Deal_Message:"
|
||||
+ Thread.currentThread().getName() + "deleteMessage "
|
||||
+ popMsg.getMessageId());
|
||||
}
|
||||
queue.deleteMessage(popMsg.getReceiptHandle());
|
||||
}
|
||||
}
|
||||
catch (ClientException e) {
|
||||
log.error("PullMessageTask_execute_error,messageType:" + messageType
|
||||
+ ",queueName:" + queueName, e);
|
||||
break;
|
||||
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
if (e.getErrorCode().equals("AccessDenied")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check messageType and queueName", e);
|
||||
}
|
||||
else {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
catch (com.aliyuncs.exceptions.ClientException e) {
|
||||
if (e.getErrCode().equals("InvalidAccessKeyId.NotFound")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check AccessKeyId", e);
|
||||
}
|
||||
if (e.getErrCode().equals("SignatureDoesNotMatch")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check AccessKeySecret", e);
|
||||
}
|
||||
else {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("PullMessageTask_execute_error,messageType:" + messageType
|
||||
+ ",queueName:" + queueName, e);
|
||||
try {
|
||||
Thread.sleep(sleepSecondWhenNoData);
|
||||
}
|
||||
catch (InterruptedException e1) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调的listener,用户自己实现
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsg(String accessKeyId, String accessKeySecret,
|
||||
String messageType, String queueName, MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, null);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调的listener,用户自己实现
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsgForVPC(String accessKeyId, String accessKeySecret,
|
||||
String messageType, String queueName, String regionIdForPop,
|
||||
String endpointNameForPop, String domainForPop, String mnsAccountEndpoint,
|
||||
MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
this.mnsAccountEndpoint = mnsAccountEndpoint;
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, null);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 虚商用户定制接收消息方法
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param ownerId 实际的ownerId
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调listener
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsgForPartnerUser(String accessKeyId, String accessKeySecret,
|
||||
Long ownerId, String messageType, String queueName,
|
||||
MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, ownerId);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
isRunning = false;
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.common.ClientException;
|
||||
import com.aliyun.mns.common.ServiceException;
|
||||
import com.aliyun.mns.model.Message;
|
||||
|
||||
/**
|
||||
* 阿里通信官方消息默认拉取工具类
|
||||
*/
|
||||
public class DefaultAlicomMessagePuller {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(DefaultAlicomMessagePuller.class);
|
||||
|
||||
private String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";// 阿里通信消息的endpoint,固定。
|
||||
private String endpointNameForPop = "cn-hangzhou";
|
||||
private String regionIdForPop = "cn-hangzhou";
|
||||
private String domainForPop = "dybaseapi.aliyuncs.com";
|
||||
private TokenGetterForAlicom tokenGetter;
|
||||
private MessageListener messageListener;
|
||||
private boolean isRunning = false;
|
||||
private Integer pullMsgThreadSize = 1;
|
||||
private boolean debugLogOpen = false;
|
||||
private Integer sleepSecondWhenNoData = 30;
|
||||
|
||||
public void openDebugLog(boolean debugLogOpen) {
|
||||
this.debugLogOpen = debugLogOpen;
|
||||
}
|
||||
|
||||
public Integer getSleepSecondWhenNoData() {
|
||||
return sleepSecondWhenNoData;
|
||||
}
|
||||
|
||||
public void setSleepSecondWhenNoData(Integer sleepSecondWhenNoData) {
|
||||
this.sleepSecondWhenNoData = sleepSecondWhenNoData;
|
||||
}
|
||||
|
||||
public Integer getPullMsgThreadSize() {
|
||||
return pullMsgThreadSize;
|
||||
}
|
||||
|
||||
public void setPullMsgThreadSize(Integer pullMsgThreadSize) {
|
||||
if (pullMsgThreadSize != null && pullMsgThreadSize > 1) {
|
||||
this.pullMsgThreadSize = pullMsgThreadSize;
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
public ExecutorService getExecutorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
public void setExecutorService(ExecutorService executorService) {
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
protected static final Map<String, Object> S_LOCK_OBJ_MAP = new HashMap<>();
|
||||
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<>();
|
||||
protected Object lockObj;
|
||||
|
||||
public boolean setPolling(String queueName) {
|
||||
synchronized (lockObj) {
|
||||
Boolean ret = sPollingMap.get(queueName);
|
||||
if (ret == null || !ret) {
|
||||
sPollingMap.put(queueName, true);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void clearPolling(String queueName) {
|
||||
synchronized (lockObj) {
|
||||
sPollingMap.put(queueName, false);
|
||||
lockObj.notifyAll();
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_WakeUp:Everyone WakeUp and Work!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return isRunning;
|
||||
}
|
||||
|
||||
public void setRunning(boolean running) {
|
||||
isRunning = running;
|
||||
}
|
||||
|
||||
private class PullMessageTask implements Runnable {
|
||||
private String messageType;
|
||||
private String queueName;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
boolean polling = false;
|
||||
while (isRunning) {
|
||||
try {
|
||||
synchronized (lockObj) {
|
||||
Boolean p = sPollingMap.get(queueName);
|
||||
if (p != null && p) {
|
||||
try {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_sleep:"
|
||||
+ Thread.currentThread().getName()
|
||||
+ " Have a nice sleep!");
|
||||
}
|
||||
polling = false;
|
||||
lockObj.wait();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Interrupted!"
|
||||
+ Thread.currentThread().getName()
|
||||
+ " QueueName is " + queueName);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TokenForAlicom tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
CloudQueue queue = tokenObject.getQueue();
|
||||
Message popMsg = null;
|
||||
if (!polling) {
|
||||
popMsg = queue.popMessage();
|
||||
if (debugLogOpen) {
|
||||
SimpleDateFormat format = new SimpleDateFormat(
|
||||
"yyyy-MM-dd HH:mm:ss");
|
||||
log.info("PullMessageTask_popMessage:"
|
||||
+ Thread.currentThread().getName() + "-popDone at "
|
||||
+ "," + format.format(new Date()) + " msgSize="
|
||||
+ (popMsg == null ? 0 : popMsg.getMessageId()));
|
||||
}
|
||||
if (popMsg == null) {
|
||||
polling = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (setPolling(queueName)) {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_setPolling:"
|
||||
+ Thread.currentThread().getName() + " Polling!");
|
||||
}
|
||||
}
|
||||
else {
|
||||
continue;
|
||||
}
|
||||
do {
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Keep_Polling"
|
||||
+ Thread.currentThread().getName()
|
||||
+ "KEEP Polling!");
|
||||
}
|
||||
try {
|
||||
popMsg = queue.popMessage(sleepSecondWhenNoData);
|
||||
}
|
||||
catch (ClientException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:ClientException Refresh accessKey"
|
||||
+ e);
|
||||
}
|
||||
tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
queue = tokenObject.getQueue();
|
||||
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:ServiceException Refresh accessKey"
|
||||
+ e);
|
||||
}
|
||||
tokenObject = tokenGetter.getTokenByMessageType(
|
||||
messageType, queueName, mnsAccountEndpoint);
|
||||
queue = tokenObject.getQueue();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (debugLogOpen) {
|
||||
log.info(
|
||||
"PullMessageTask_Pop_Message:Exception Happened when polling popMessage: "
|
||||
+ e);
|
||||
}
|
||||
}
|
||||
}
|
||||
while (popMsg == null && isRunning);
|
||||
clearPolling(queueName);
|
||||
}
|
||||
boolean dealResult = messageListener.dealMessage(popMsg);
|
||||
if (dealResult) {
|
||||
// remember to delete message when consume message successfully.
|
||||
if (debugLogOpen) {
|
||||
log.info("PullMessageTask_Deal_Message:"
|
||||
+ Thread.currentThread().getName() + "deleteMessage "
|
||||
+ popMsg.getMessageId());
|
||||
}
|
||||
queue.deleteMessage(popMsg.getReceiptHandle());
|
||||
}
|
||||
}
|
||||
catch (ClientException e) {
|
||||
log.error("PullMessageTask_execute_error,messageType:" + messageType
|
||||
+ ",queueName:" + queueName, e);
|
||||
break;
|
||||
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
if (e.getErrorCode().equals("AccessDenied")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check messageType and queueName", e);
|
||||
}
|
||||
else {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
catch (com.aliyuncs.exceptions.ClientException e) {
|
||||
if (e.getErrCode().equals("InvalidAccessKeyId.NotFound")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check AccessKeyId", e);
|
||||
}
|
||||
if (e.getErrCode().equals("SignatureDoesNotMatch")) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName
|
||||
+ ",please check AccessKeySecret", e);
|
||||
}
|
||||
else {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("PullMessageTask_execute_error,messageType:" + messageType
|
||||
+ ",queueName:" + queueName, e);
|
||||
try {
|
||||
Thread.sleep(sleepSecondWhenNoData);
|
||||
}
|
||||
catch (InterruptedException e1) {
|
||||
log.error("PullMessageTask_execute_error,messageType:"
|
||||
+ messageType + ",queueName:" + queueName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调的listener,用户自己实现
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsg(String accessKeyId, String accessKeySecret,
|
||||
String messageType, String queueName, MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, null);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调的listener,用户自己实现
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsgForVPC(String accessKeyId, String accessKeySecret,
|
||||
String messageType, String queueName, String regionIdForPop,
|
||||
String endpointNameForPop, String domainForPop, String mnsAccountEndpoint,
|
||||
MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
this.mnsAccountEndpoint = mnsAccountEndpoint;
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, null);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 虚商用户定制接收消息方法
|
||||
* @param accessKeyId accessKeyId
|
||||
* @param accessKeySecret accessKeySecret
|
||||
* @param ownerId 实际的ownerId
|
||||
* @param messageType 消息类型
|
||||
* @param queueName 队列名称
|
||||
* @param messageListener 回调listener
|
||||
* @throws com.aliyuncs.exceptions.ClientException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void startReceiveMsgForPartnerUser(String accessKeyId, String accessKeySecret,
|
||||
Long ownerId, String messageType, String queueName,
|
||||
MessageListener messageListener)
|
||||
throws com.aliyuncs.exceptions.ClientException, ParseException {
|
||||
|
||||
tokenGetter = new TokenGetterForAlicom(accessKeyId, accessKeySecret,
|
||||
endpointNameForPop, regionIdForPop, domainForPop, ownerId);
|
||||
|
||||
this.messageListener = messageListener;
|
||||
isRunning = true;
|
||||
PullMessageTask task = new PullMessageTask();
|
||||
task.messageType = messageType;
|
||||
task.queueName = queueName;
|
||||
|
||||
synchronized (S_LOCK_OBJ_MAP) {
|
||||
lockObj = S_LOCK_OBJ_MAP.get(queueName);
|
||||
if (lockObj == null) {
|
||||
lockObj = new Object();
|
||||
S_LOCK_OBJ_MAP.put(queueName, lockObj);
|
||||
}
|
||||
}
|
||||
|
||||
if (executorService == null) {
|
||||
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||
pullMsgThreadSize,
|
||||
new BasicThreadFactory.Builder()
|
||||
.namingPattern(
|
||||
"PullMessageTask-" + messageType + "-thread-pool-%d")
|
||||
.daemon(true).build());
|
||||
executorService = scheduledExecutorService;
|
||||
}
|
||||
for (int i = 0; i < pullMsgThreadSize; i++) {
|
||||
executorService.execute(task);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
isRunning = false;
|
||||
}
|
||||
|
||||
}
|
@@ -1,24 +1,24 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
|
||||
import com.aliyun.mns.model.Message;
|
||||
|
||||
public interface MessageListener {
|
||||
|
||||
boolean dealMessage(Message message);
|
||||
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import com.aliyun.mns.model.Message;
|
||||
|
||||
public interface MessageListener {
|
||||
|
||||
boolean dealMessage(Message message);
|
||||
|
||||
}
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import com.aliyuncs.RpcAcsRequest;
|
||||
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import com.aliyuncs.AcsResponse;
|
||||
import com.aliyuncs.transform.UnmarshallerContext;
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import com.aliyuncs.transform.UnmarshallerContext;
|
||||
|
@@ -1,96 +1,96 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.client.MNSClient;
|
||||
|
||||
/**
|
||||
* 用于接收云通信消息的临时token
|
||||
*
|
||||
*/
|
||||
public class TokenForAlicom {
|
||||
private String messageType;
|
||||
private String token;
|
||||
private Long expireTime;
|
||||
private String tempAccessKeyId;
|
||||
private String tempAccessKeySecret;
|
||||
private MNSClient client;
|
||||
private CloudQueue queue;
|
||||
|
||||
public String getMessageType() {
|
||||
return messageType;
|
||||
}
|
||||
|
||||
public void setMessageType(String messageType) {
|
||||
this.messageType = messageType;
|
||||
}
|
||||
|
||||
public String getToken() {
|
||||
return token;
|
||||
}
|
||||
|
||||
public void setToken(String token) {
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public Long getExpireTime() {
|
||||
return expireTime;
|
||||
}
|
||||
|
||||
public void setExpireTime(Long expireTime) {
|
||||
this.expireTime = expireTime;
|
||||
}
|
||||
|
||||
public String getTempAccessKeyId() {
|
||||
return tempAccessKeyId;
|
||||
}
|
||||
|
||||
public void setTempAccessKeyId(String tempAccessKeyId) {
|
||||
this.tempAccessKeyId = tempAccessKeyId;
|
||||
}
|
||||
|
||||
public String getTempAccessKeySecret() {
|
||||
return tempAccessKeySecret;
|
||||
}
|
||||
|
||||
public void setTempAccessKeySecret(String tempAccessKeySecret) {
|
||||
this.tempAccessKeySecret = tempAccessKeySecret;
|
||||
}
|
||||
|
||||
public MNSClient getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public void setClient(MNSClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public CloudQueue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
public void setQueue(CloudQueue queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
public void closeClient() {
|
||||
if (client != null) {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.client.MNSClient;
|
||||
|
||||
/**
|
||||
* 用于接收云通信消息的临时token
|
||||
*
|
||||
*/
|
||||
public class TokenForAlicom {
|
||||
private String messageType;
|
||||
private String token;
|
||||
private Long expireTime;
|
||||
private String tempAccessKeyId;
|
||||
private String tempAccessKeySecret;
|
||||
private MNSClient client;
|
||||
private CloudQueue queue;
|
||||
|
||||
public String getMessageType() {
|
||||
return messageType;
|
||||
}
|
||||
|
||||
public void setMessageType(String messageType) {
|
||||
this.messageType = messageType;
|
||||
}
|
||||
|
||||
public String getToken() {
|
||||
return token;
|
||||
}
|
||||
|
||||
public void setToken(String token) {
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public Long getExpireTime() {
|
||||
return expireTime;
|
||||
}
|
||||
|
||||
public void setExpireTime(Long expireTime) {
|
||||
this.expireTime = expireTime;
|
||||
}
|
||||
|
||||
public String getTempAccessKeyId() {
|
||||
return tempAccessKeyId;
|
||||
}
|
||||
|
||||
public void setTempAccessKeyId(String tempAccessKeyId) {
|
||||
this.tempAccessKeyId = tempAccessKeyId;
|
||||
}
|
||||
|
||||
public String getTempAccessKeySecret() {
|
||||
return tempAccessKeySecret;
|
||||
}
|
||||
|
||||
public void setTempAccessKeySecret(String tempAccessKeySecret) {
|
||||
this.tempAccessKeySecret = tempAccessKeySecret;
|
||||
}
|
||||
|
||||
public MNSClient getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public void setClient(MNSClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public CloudQueue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
public void setQueue(CloudQueue queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
public void closeClient() {
|
||||
if (client != null) {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -1,141 +1,141 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.base;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.aliyun.mns.client.CloudAccount;
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.client.MNSClient;
|
||||
import com.aliyuncs.DefaultAcsClient;
|
||||
import com.aliyuncs.IAcsClient;
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.exceptions.ServerException;
|
||||
import com.aliyuncs.http.FormatType;
|
||||
import com.aliyuncs.http.MethodType;
|
||||
import com.aliyuncs.http.ProtocolType;
|
||||
import com.aliyuncs.profile.DefaultProfile;
|
||||
import com.aliyuncs.profile.IClientProfile;
|
||||
|
||||
/**
|
||||
* 获取接收云通信消息的临时token
|
||||
*
|
||||
*/
|
||||
public class TokenGetterForAlicom {
|
||||
private static final Logger log = LoggerFactory.getLogger(TokenGetterForAlicom.class);
|
||||
private String accessKeyId;
|
||||
private String accessKeySecret;
|
||||
private String endpointNameForPop;
|
||||
private String regionIdForPop;
|
||||
private String domainForPop;
|
||||
private IAcsClient iAcsClient;
|
||||
private Long ownerId;
|
||||
private final static String PRODUCT_NAME = "Dybaseapi";
|
||||
private long bufferTime = 1000 * 60 * 2;// 过期时间小于2分钟则重新获取,防止服务器时间误差
|
||||
private final Object lock = new Object();
|
||||
private ConcurrentMap<String, TokenForAlicom> tokenMap = new ConcurrentHashMap<String, TokenForAlicom>();
|
||||
|
||||
public TokenGetterForAlicom(String accessKeyId, String accessKeySecret,
|
||||
String endpointNameForPop, String regionIdForPop, String domainForPop,
|
||||
Long ownerId) throws ClientException {
|
||||
this.accessKeyId = accessKeyId;
|
||||
this.accessKeySecret = accessKeySecret;
|
||||
this.endpointNameForPop = endpointNameForPop;
|
||||
this.regionIdForPop = regionIdForPop;
|
||||
this.domainForPop = domainForPop;
|
||||
this.ownerId = ownerId;
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() throws ClientException {
|
||||
DefaultProfile.addEndpoint(endpointNameForPop, regionIdForPop, PRODUCT_NAME,
|
||||
domainForPop);
|
||||
IClientProfile profile = DefaultProfile.getProfile(regionIdForPop, accessKeyId,
|
||||
accessKeySecret);
|
||||
profile.getHttpClientConfig().setCompatibleMode(true);
|
||||
iAcsClient = new DefaultAcsClient(profile);
|
||||
}
|
||||
|
||||
private TokenForAlicom getTokenFromRemote(String messageType)
|
||||
throws ServerException, ClientException, ParseException {
|
||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
df.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
|
||||
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest();
|
||||
request.setAcceptFormat(FormatType.JSON);
|
||||
request.setMessageType(messageType);
|
||||
request.setOwnerId(ownerId);
|
||||
request.setProtocol(ProtocolType.HTTPS);
|
||||
request.setMethod(MethodType.POST);
|
||||
QueryTokenForMnsQueueResponse response = iAcsClient.getAcsResponse(request);
|
||||
String resultCode = response.getCode();
|
||||
if (resultCode != null && "OK".equals(resultCode)) {
|
||||
QueryTokenForMnsQueueResponse.MessageTokenDTO dto = response
|
||||
.getMessageTokenDTO();
|
||||
TokenForAlicom token = new TokenForAlicom();
|
||||
String timeStr = dto.getExpireTime();
|
||||
token.setMessageType(messageType);
|
||||
token.setExpireTime(df.parse(timeStr).getTime());
|
||||
token.setToken(dto.getSecurityToken());
|
||||
token.setTempAccessKeyId(dto.getAccessKeyId());
|
||||
token.setTempAccessKeySecret(dto.getAccessKeySecret());
|
||||
return token;
|
||||
}
|
||||
else {
|
||||
log.error("getTokenFromRemote_error,messageType:" + messageType + ",code:"
|
||||
+ response.getCode() + ",message:" + response.getMessage());
|
||||
throw new ServerException(response.getCode(), response.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public TokenForAlicom getTokenByMessageType(String messageType, String queueName,
|
||||
String mnsAccountEndpoint)
|
||||
throws ServerException, ClientException, ParseException {
|
||||
TokenForAlicom token = tokenMap.get(messageType);
|
||||
Long now = System.currentTimeMillis();
|
||||
if (token == null || (token.getExpireTime() - now) < bufferTime) {// 过期时间小于2分钟则重新获取,防止服务器时间误差
|
||||
synchronized (lock) {
|
||||
token = tokenMap.get(messageType);
|
||||
if (token == null || (token.getExpireTime() - now) < bufferTime) {
|
||||
TokenForAlicom oldToken = null;
|
||||
if (token != null) {
|
||||
oldToken = token;
|
||||
}
|
||||
token = getTokenFromRemote(messageType);
|
||||
// 因为换token时需要重建client和关闭老的client,所以创建client的代码和创建token放在一起
|
||||
CloudAccount account = new CloudAccount(token.getTempAccessKeyId(),
|
||||
token.getTempAccessKeySecret(), mnsAccountEndpoint,
|
||||
token.getToken());
|
||||
MNSClient client = account.getMNSClient();
|
||||
CloudQueue queue = client.getQueueRef(queueName);
|
||||
token.setClient(client);
|
||||
token.setQueue(queue);
|
||||
tokenMap.put(messageType, token);
|
||||
if (oldToken != null) {
|
||||
oldToken.closeClient();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return token;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.alibaba.alicloud.sms.base;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.aliyun.mns.client.CloudAccount;
|
||||
import com.aliyun.mns.client.CloudQueue;
|
||||
import com.aliyun.mns.client.MNSClient;
|
||||
import com.aliyuncs.DefaultAcsClient;
|
||||
import com.aliyuncs.IAcsClient;
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.exceptions.ServerException;
|
||||
import com.aliyuncs.http.FormatType;
|
||||
import com.aliyuncs.http.MethodType;
|
||||
import com.aliyuncs.http.ProtocolType;
|
||||
import com.aliyuncs.profile.DefaultProfile;
|
||||
import com.aliyuncs.profile.IClientProfile;
|
||||
|
||||
/**
|
||||
* 获取接收云通信消息的临时token
|
||||
*
|
||||
*/
|
||||
public class TokenGetterForAlicom {
|
||||
private static final Logger log = LoggerFactory.getLogger(TokenGetterForAlicom.class);
|
||||
private String accessKeyId;
|
||||
private String accessKeySecret;
|
||||
private String endpointNameForPop;
|
||||
private String regionIdForPop;
|
||||
private String domainForPop;
|
||||
private IAcsClient iAcsClient;
|
||||
private Long ownerId;
|
||||
private final static String PRODUCT_NAME = "Dybaseapi";
|
||||
private long bufferTime = 1000 * 60 * 2;// 过期时间小于2分钟则重新获取,防止服务器时间误差
|
||||
private final Object lock = new Object();
|
||||
private ConcurrentMap<String, TokenForAlicom> tokenMap = new ConcurrentHashMap<String, TokenForAlicom>();
|
||||
|
||||
public TokenGetterForAlicom(String accessKeyId, String accessKeySecret,
|
||||
String endpointNameForPop, String regionIdForPop, String domainForPop,
|
||||
Long ownerId) throws ClientException {
|
||||
this.accessKeyId = accessKeyId;
|
||||
this.accessKeySecret = accessKeySecret;
|
||||
this.endpointNameForPop = endpointNameForPop;
|
||||
this.regionIdForPop = regionIdForPop;
|
||||
this.domainForPop = domainForPop;
|
||||
this.ownerId = ownerId;
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() throws ClientException {
|
||||
DefaultProfile.addEndpoint(endpointNameForPop, regionIdForPop, PRODUCT_NAME,
|
||||
domainForPop);
|
||||
IClientProfile profile = DefaultProfile.getProfile(regionIdForPop, accessKeyId,
|
||||
accessKeySecret);
|
||||
profile.getHttpClientConfig().setCompatibleMode(true);
|
||||
iAcsClient = new DefaultAcsClient(profile);
|
||||
}
|
||||
|
||||
private TokenForAlicom getTokenFromRemote(String messageType)
|
||||
throws ServerException, ClientException, ParseException {
|
||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
df.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
|
||||
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest();
|
||||
request.setAcceptFormat(FormatType.JSON);
|
||||
request.setMessageType(messageType);
|
||||
request.setOwnerId(ownerId);
|
||||
request.setProtocol(ProtocolType.HTTPS);
|
||||
request.setMethod(MethodType.POST);
|
||||
QueryTokenForMnsQueueResponse response = iAcsClient.getAcsResponse(request);
|
||||
String resultCode = response.getCode();
|
||||
if (resultCode != null && "OK".equals(resultCode)) {
|
||||
QueryTokenForMnsQueueResponse.MessageTokenDTO dto = response
|
||||
.getMessageTokenDTO();
|
||||
TokenForAlicom token = new TokenForAlicom();
|
||||
String timeStr = dto.getExpireTime();
|
||||
token.setMessageType(messageType);
|
||||
token.setExpireTime(df.parse(timeStr).getTime());
|
||||
token.setToken(dto.getSecurityToken());
|
||||
token.setTempAccessKeyId(dto.getAccessKeyId());
|
||||
token.setTempAccessKeySecret(dto.getAccessKeySecret());
|
||||
return token;
|
||||
}
|
||||
else {
|
||||
log.error("getTokenFromRemote_error,messageType:" + messageType + ",code:"
|
||||
+ response.getCode() + ",message:" + response.getMessage());
|
||||
throw new ServerException(response.getCode(), response.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public TokenForAlicom getTokenByMessageType(String messageType, String queueName,
|
||||
String mnsAccountEndpoint)
|
||||
throws ServerException, ClientException, ParseException {
|
||||
TokenForAlicom token = tokenMap.get(messageType);
|
||||
Long now = System.currentTimeMillis();
|
||||
if (token == null || (token.getExpireTime() - now) < bufferTime) {// 过期时间小于2分钟则重新获取,防止服务器时间误差
|
||||
synchronized (lock) {
|
||||
token = tokenMap.get(messageType);
|
||||
if (token == null || (token.getExpireTime() - now) < bufferTime) {
|
||||
TokenForAlicom oldToken = null;
|
||||
if (token != null) {
|
||||
oldToken = token;
|
||||
}
|
||||
token = getTokenFromRemote(messageType);
|
||||
// 因为换token时需要重建client和关闭老的client,所以创建client的代码和创建token放在一起
|
||||
CloudAccount account = new CloudAccount(token.getTempAccessKeyId(),
|
||||
token.getTempAccessKeySecret(), mnsAccountEndpoint,
|
||||
token.getToken());
|
||||
MNSClient client = account.getMNSClient();
|
||||
CloudQueue queue = client.getQueueRef(queueName);
|
||||
token.setClient(client);
|
||||
token.setQueue(queue);
|
||||
tokenMap.put(messageType, token);
|
||||
if (oldToken != null) {
|
||||
oldToken.closeClient();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return token;
|
||||
}
|
||||
}
|
@@ -13,19 +13,20 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.config;
|
||||
package com.alibaba.alicloud.sms.config;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.alicloud.context.AliCloudProperties;
|
||||
import org.springframework.cloud.alicloud.context.sms.SmsProperties;
|
||||
import org.springframework.cloud.alicloud.sms.ISmsService;
|
||||
import org.springframework.cloud.alicloud.sms.SmsInitializerEventListener;
|
||||
import org.springframework.cloud.alicloud.sms.SmsServiceImpl;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.alibaba.alicloud.context.AliCloudProperties;
|
||||
import com.alibaba.alicloud.context.sms.SmsProperties;
|
||||
import com.alibaba.alicloud.sms.ISmsService;
|
||||
import com.alibaba.alicloud.sms.SmsInitializerEventListener;
|
||||
import com.alibaba.alicloud.sms.SmsServiceImpl;
|
||||
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
|
||||
|
||||
/**
|
@@ -13,10 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.endpoint;
|
||||
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendBatchSmsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
|
||||
package com.alibaba.alicloud.sms.endpoint;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
@@ -25,6 +22,9 @@ import java.util.Map;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendBatchSmsRequest;
|
||||
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
@@ -13,12 +13,12 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.endpoint;
|
||||
|
||||
import org.springframework.cloud.alicloud.sms.base.MessageListener;
|
||||
package com.alibaba.alicloud.sms.endpoint;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import com.alibaba.alicloud.sms.base.MessageListener;
|
||||
|
||||
/**
|
||||
* @author pbting
|
||||
*/
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.endpoint;
|
||||
package com.alibaba.alicloud.sms.endpoint;
|
||||
|
||||
import java.util.Map;
|
||||
|
@@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.alicloud.sms.endpoint;
|
||||
package com.alibaba.alicloud.sms.endpoint;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
@@ -1,3 +1,3 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.alicloud.sms.config.SmsAutoConfiguration,\
|
||||
org.springframework.cloud.alicloud.sms.endpoint.SmsEndpointAutoConfiguration
|
||||
com.alibaba.alicloud.sms.config.SmsAutoConfiguration,\
|
||||
com.alibaba.alicloud.sms.endpoint.SmsEndpointAutoConfiguration
|
Reference in New Issue
Block a user