mirror of
https://gitee.com/mirrors/Spring-Cloud-Alibaba.git
synced 2021-06-26 13:25:11 +08:00
Support OssStorage Resource as WritableResource and add related
unittest cases
This commit is contained in:
@@ -16,22 +16,25 @@
|
||||
|
||||
package com.alibaba.alicloud.oss.resource;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.aliyun.oss.ClientException;
|
||||
import com.aliyun.oss.OSS;
|
||||
import com.aliyun.oss.OSSException;
|
||||
import com.aliyun.oss.model.Bucket;
|
||||
import com.aliyun.oss.model.OSSObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.WritableResource;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Implements {@link Resource} for reading and writing objects in Aliyun Object Storage
|
||||
@@ -43,18 +46,30 @@ import com.aliyun.oss.model.OSSObject;
|
||||
* @see Bucket
|
||||
* @see OSSObject
|
||||
*/
|
||||
public class OssStorageResource implements Resource {
|
||||
public class OssStorageResource implements WritableResource {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(OssStorageResource.class);
|
||||
|
||||
private static final String MESSAGE_KEY_NOT_EXIST = "The specified key does not exist.";
|
||||
|
||||
private final OSS oss;
|
||||
private final String bucketName;
|
||||
private final String objectKey;
|
||||
private final URI location;
|
||||
private final boolean autoCreateFiles;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public OssStorageResource(OSS oss, String location) {
|
||||
this(oss, location, false);
|
||||
}
|
||||
|
||||
public OssStorageResource(OSS oss, String location, boolean autoCreateFiles) {
|
||||
Assert.notNull(oss, "Object Storage Service can not be null");
|
||||
Assert.isTrue(location.startsWith(OssStorageProtocolResolver.PROTOCOL),
|
||||
"Location must start with " + OssStorageProtocolResolver.PROTOCOL);
|
||||
this.oss = oss;
|
||||
this.autoCreateFiles = autoCreateFiles;
|
||||
try {
|
||||
URI locationUri = new URI(location);
|
||||
this.bucketName = locationUri.getAuthority();
|
||||
@@ -70,6 +85,14 @@ public class OssStorageResource implements Resource {
|
||||
catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Invalid location: " + location, e);
|
||||
}
|
||||
|
||||
this.executorService = new ThreadPoolExecutor(
|
||||
1, 1, 60, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>());
|
||||
}
|
||||
|
||||
public boolean isAutoCreateFiles() {
|
||||
return this.autoCreateFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -193,4 +216,61 @@ public class OssStorageResource implements Resource {
|
||||
}
|
||||
}
|
||||
|
||||
public Bucket createBucket() {
|
||||
return this.oss.createBucket(this.bucketName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable() {
|
||||
return !isBucket() && (this.autoCreateFiles || exists());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取一个OutputStream用于写操作。
|
||||
* 注意:写完成后必须关闭该流
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public OutputStream getOutputStream() throws IOException {
|
||||
if (isBucket()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot open an output stream to a bucket: '" + getURI() + "'");
|
||||
}
|
||||
else {
|
||||
OSSObject ossObject;
|
||||
|
||||
try {
|
||||
ossObject = this.getOSSObject();
|
||||
} catch (OSSException ex) {
|
||||
if (ex.getMessage() != null && ex.getMessage().startsWith(MESSAGE_KEY_NOT_EXIST)) {
|
||||
ossObject = null;
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
if (ossObject == null ) {
|
||||
if (!this.autoCreateFiles) {
|
||||
throw new FileNotFoundException("The object was not found: " + getURI());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
PipedInputStream in = new PipedInputStream();
|
||||
final PipedOutputStream out = new PipedOutputStream(in);
|
||||
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
OssStorageResource.this.oss.putObject(bucketName, objectKey, in);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Failed to put object", ex);
|
||||
}
|
||||
});
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user