|
@@ -162,17 +162,20 @@ public class OssClient {
|
|
|
try {
|
|
try {
|
|
|
// 构建上传请求对象
|
|
// 构建上传请求对象
|
|
|
FileUpload fileUpload = transferManager.uploadFile(
|
|
FileUpload fileUpload = transferManager.uploadFile(
|
|
|
- x -> x.putObjectRequest(
|
|
|
|
|
- y -> y.bucket(properties.getBucketName())
|
|
|
|
|
- .key(key)
|
|
|
|
|
- .contentMD5(StringUtils.isNotEmpty(md5Digest) ? md5Digest : null)
|
|
|
|
|
- .contentType(contentType)
|
|
|
|
|
- // 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
|
|
|
|
- // 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
|
|
|
|
- // .acl(getAccessPolicy().getObjectCannedACL())
|
|
|
|
|
- .build())
|
|
|
|
|
- .addTransferListener(LoggingTransferListener.create())
|
|
|
|
|
- .source(filePath).build());
|
|
|
|
|
|
|
+ x -> {
|
|
|
|
|
+ x.source(filePath).putObjectRequest(
|
|
|
|
|
+ y -> y.bucket(properties.getBucketName())
|
|
|
|
|
+ .key(key)
|
|
|
|
|
+ .contentMD5(StringUtils.isNotEmpty(md5Digest) ? md5Digest : null)
|
|
|
|
|
+ .contentType(contentType)
|
|
|
|
|
+ // 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
|
|
|
|
+ // 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
|
|
|
|
+ // .acl(getAccessPolicy().getObjectCannedACL())
|
|
|
|
|
+ .build());
|
|
|
|
|
+ if (log.isDebugEnabled()) {
|
|
|
|
|
+ x.addTransferListener(LoggingTransferListener.create());
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
|
|
|
// 等待上传完成并获取上传结果
|
|
// 等待上传完成并获取上传结果
|
|
|
CompletedFileUpload uploadResult = fileUpload.completionFuture().join();
|
|
CompletedFileUpload uploadResult = fileUpload.completionFuture().join();
|
|
@@ -221,16 +224,19 @@ public class OssClient {
|
|
|
.build();
|
|
.build();
|
|
|
// 使用 transferManager 进行上传
|
|
// 使用 transferManager 进行上传
|
|
|
Upload upload = transferManager.upload(
|
|
Upload upload = transferManager.upload(
|
|
|
- x -> x.requestBody(body).addTransferListener(LoggingTransferListener.create())
|
|
|
|
|
- .putObjectRequest(
|
|
|
|
|
- y -> y.bucket(properties.getBucketName())
|
|
|
|
|
- .key(key)
|
|
|
|
|
- .contentType(contentType)
|
|
|
|
|
- // 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
|
|
|
|
- // 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
|
|
|
|
- // .acl(getAccessPolicy().getObjectCannedACL())
|
|
|
|
|
- .build())
|
|
|
|
|
- .build());
|
|
|
|
|
|
|
+ x -> {
|
|
|
|
|
+ x.requestBody(body).putObjectRequest(
|
|
|
|
|
+ y -> y.bucket(properties.getBucketName())
|
|
|
|
|
+ .key(key)
|
|
|
|
|
+ .contentType(contentType)
|
|
|
|
|
+ // 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
|
|
|
|
+ // 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
|
|
|
|
+ // .acl(getAccessPolicy().getObjectCannedACL())
|
|
|
|
|
+ .build());
|
|
|
|
|
+ if (log.isDebugEnabled()) {
|
|
|
|
|
+ x.addTransferListener(LoggingTransferListener.create());
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
|
|
|
// 将输入流写入请求体
|
|
// 将输入流写入请求体
|
|
|
body.writeInputStream(inputStream);
|
|
body.writeInputStream(inputStream);
|
|
@@ -410,20 +416,18 @@ public class OssClient {
|
|
|
*/
|
|
*/
|
|
|
public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
|
|
public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
|
|
|
try {
|
|
try {
|
|
|
- // 构建下载请求
|
|
|
|
|
- DownloadRequest<ResponsePublisher<GetObjectResponse>> publisherDownloadRequest = DownloadRequest.builder()
|
|
|
|
|
- // 文件对象
|
|
|
|
|
- .getObjectRequest(y -> y.bucket(properties.getBucketName())
|
|
|
|
|
- .key(key)
|
|
|
|
|
- .build())
|
|
|
|
|
- .addTransferListener(LoggingTransferListener.create())
|
|
|
|
|
|
|
+ DownloadRequest.TypedBuilder<ResponsePublisher<GetObjectResponse>> typedBuilder = DownloadRequest.builder()
|
|
|
// 使用发布订阅转换器
|
|
// 使用发布订阅转换器
|
|
|
.responseTransformer(AsyncResponseTransformer.toPublisher())
|
|
.responseTransformer(AsyncResponseTransformer.toPublisher())
|
|
|
- .build();
|
|
|
|
|
|
|
+ // 文件对象
|
|
|
|
|
+ .getObjectRequest(y -> y.bucket(properties.getBucketName()).key(key).build());
|
|
|
|
|
+ if (log.isDebugEnabled()) {
|
|
|
|
|
+ typedBuilder.addTransferListener(LoggingTransferListener.create());
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
// 使用 S3TransferManager 下载文件
|
|
// 使用 S3TransferManager 下载文件
|
|
|
Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager
|
|
Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager
|
|
|
- .download(publisherDownloadRequest);
|
|
|
|
|
|
|
+ .download(typedBuilder.build());
|
|
|
// 获取下载发布订阅转换器
|
|
// 获取下载发布订阅转换器
|
|
|
ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
|
|
ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
|
|
|
// 执行文件大小消费者函数
|
|
// 执行文件大小消费者函数
|
|
@@ -432,19 +436,19 @@ public class OssClient {
|
|
|
|
|
|
|
|
// 构建写出订阅器对象
|
|
// 构建写出订阅器对象
|
|
|
return out -> {
|
|
return out -> {
|
|
|
- // 注意,此处不需要显式关闭 channel ,channel 会在 out 关闭时自动关闭
|
|
|
|
|
- WritableByteChannel channel = Channels.newChannel(out);
|
|
|
|
|
-
|
|
|
|
|
- // 订阅数据
|
|
|
|
|
- publisher.subscribe(byteBuffer -> {
|
|
|
|
|
- try {
|
|
|
|
|
|
|
+ // 创建可写入的字节通道
|
|
|
|
|
+ try (WritableByteChannel channel = Channels.newChannel(out)) {
|
|
|
|
|
+ // 订阅数据
|
|
|
|
|
+ publisher.subscribe(byteBuffer -> {
|
|
|
while (byteBuffer.hasRemaining()) {
|
|
while (byteBuffer.hasRemaining()) {
|
|
|
- channel.write(byteBuffer);
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ channel.write(byteBuffer);
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
- }
|
|
|
|
|
- }).join();
|
|
|
|
|
|
|
+ }).join();
|
|
|
|
|
+ }
|
|
|
};
|
|
};
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
|
|
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
|