ソースを参照

Update 改用发布订阅的方式替代阻塞流,优化大文件下载时的内存占用

Yue 6 ヶ月 前
コミット
a85bbea167

+ 49 - 11
SERVER/VberAdminPlusV3/vber-common/vber-common-oss/src/main/java/com/vber/common/oss/core/OssClient.java

@@ -18,9 +18,9 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
+import software.amazon.awssdk.core.async.ResponsePublisher;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -34,12 +34,16 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
 import java.io.*;
 import java.net.URI;
 import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.Date;
+import java.util.Optional;
 import java.util.Random;
+import java.util.function.Consumer;
 
 /**
  * S3 存储协议 所有兼容S3协议的云厂商均支持
@@ -379,27 +383,61 @@ public class OssClient {
      *
      * @param key 文件在 Amazon S3 中的对象键
      * @param out 输出流
-     * @return 输出流中写入的字节数(长度)
      * @throws OssException 如果下载失败,抛出自定义异常
      */
-    public long download(String key, OutputStream out) {
+    public void download(String key, OutputStream out, Consumer<Long> consumer) {
+        try {
+            this.download(key, consumer).writeTo(out);
+        } catch (Exception e) {
+            throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
+        }
+    }
+
+    /**
+     * 下载文件从 Amazon S3 到 输出流
+     *
+     * @param key                   文件在 Amazon S3 中的对象键
+     * @param contentLengthConsumer 文件大小消费者函数
+     * @return 写出订阅器
+     * @throws OssException 如果下载失败,抛出自定义异常
+     */
+    public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
         try {
             // 构建下载请求
-            DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest = DownloadRequest.builder()
+            DownloadRequest<ResponsePublisher<GetObjectResponse>> publisherDownloadRequest = DownloadRequest.builder()
                     // 文件对象
                     .getObjectRequest(y -> y.bucket(properties.getBucketName())
                             .key(key)
                             .build())
                     .addTransferListener(LoggingTransferListener.create())
-                    // 使用订阅转换器
-                    .responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
+                    // 使用发布订阅转换器
+                    .responseTransformer(AsyncResponseTransformer.toPublisher())
                     .build();
+
             // 使用 S3TransferManager 下载文件
-            Download<ResponseInputStream<GetObjectResponse>> responseFuture = transferManager.download(downloadRequest);
-            // 输出到流中
-            try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.completionFuture().join().result()) { // auto-closeable stream
-                return responseStream.transferTo(out); // 阻塞调用线程 blocks the calling thread
-            }
+            Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager.download(publisherDownloadRequest);
+            // 获取下载发布订阅转换器
+            ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
+            // 执行文件大小消费者函数
+            Optional.ofNullable(contentLengthConsumer)
+                    .ifPresent(lengthConsumer -> lengthConsumer.accept(publisher.response().contentLength()));
+
+            // 构建写出订阅器对象
+            return out -> {
+                // 注意,此处不需要显式关闭 channel ,channel 会在 out 关闭时自动关闭
+                WritableByteChannel channel = Channels.newChannel(out);
+
+                // 订阅数据
+                publisher.subscribe(byteBuffer -> {
+                    try {
+                        while (byteBuffer.hasRemaining()) {
+                            channel.write(byteBuffer);
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }).join();
+            };
         } catch (Exception e) {
             throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
         }

+ 15 - 0
SERVER/VberAdminPlusV3/vber-common/vber-common-oss/src/main/java/com/vber/common/oss/core/WriteOutSubscriber.java

@@ -0,0 +1,15 @@
+package com.vber.common.oss.core;
+
+import java.io.IOException;
+
+/**
+ * 写出订阅器
+ *
+ * @author Iwb
+ */
+@FunctionalInterface
+public interface WriteOutSubscriber<T> {
+
+    void writeTo(T out) throws IOException;
+
+}