hatenob

プログラムって分からないことだらけ

WebFluxでS3にファイルアップロード(AWS Java SDK v2)

以前、WebFluxで複数ファイルアップロードしてサーバサイドに保存する方法を試したのだけれど、今回はそれをS3にアップロードにしてみました。
AWS側はJava SDK v2を使います。
v2になってノンブロッキングI/Oがサポートされたそうなので、WebFluxを使った場合でも問題なく使えるはず、ということです。

docs.aws.amazon.com

最初に断っておきますが、以下に記載するコードについては、正解が分からないまま「なんとなくこんな感じかなぁ」で書いて「とりあえず動いた」というもので、書き方として正しいか、本当にブロック処理入ってないか、とか全然分かってませんというのが大前提です。
正解が分かる方は是非教えていただきたいです。

pom.xml

WebFluxとAWS SDK v2を依存関係に追加します。

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.1.RELEASE</version>
</parent>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>bom</artifactId>
      <version>2.10.25</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>s3</artifactId>
    </dependency>
  </dependencies>

Handler

アップロードの主処理です。

@Component
@Slf4j
public class UploadHandler {
    private final S3AsyncClient s3;
    private static final String bucket = "ap-cdk-bucket-1";
    private static final String key = "app/test";

    @Autowired
    public UploadHandler() {
        this.s3 = S3AsyncClient.builder().region(Region.AP_NORTHEAST_1).build();
    }

    public RouterFunction<ServerResponse> route() {
        return RouterFunctions.route().path("/upload", u -> {
            u.POST("/", this::upload);
        }).build();
    }

    public Mono<ServerResponse> upload(final ServerRequest req) {
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM)
                .body(
                    req.multipartData().map(d -> d.get("f"))
                        .flatMapMany(Flux::fromIterable)
                        .cast(FilePart.class)
                        .flatMap(f -> putObject(f)).log(), UploadResponse.class);
    }

    private Mono<UploadResponse> putObject(final FilePart filePart) {
        final PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key + "/" + filePart.filename())
                .build();

        Path localPath = Paths.get(System.getProperty("java.io.tmpdir"), filePart.filename());
        filePart.transferTo(localPath);

        final AsyncRequestBody body = AsyncRequestBody.fromFile(localPath);

        return Mono.fromFuture(s3.putObject(req, body))
            .doOnError(e -> log.error(e.getMessage(), e))
            .map(res -> new UploadResponse(req.bucket() + "/" + req.key(), res.versionId()));
    }
}

@Value
@JsonDeserialize
public class UploadResponse {
    private String path;
    private String version;
}

@Component
public class RouteConfig {
    @Bean
    public RouterFunction<ServerResponse> routes(UploadHandler uploadHandler) {
        return uploadHandler.route();
    }
}

見ての通りで、transferToで一度ローカルのtmpdirに保存してから非同期でアップロードしています。
直接bytebufferとかに変換してアップロードできないかなぁと試行錯誤してみたものの、イマイチ良い方法が見つからず諦めました。
ちなみにローカルへの保存やS3へのアップロード時に本当は一意な名前になるように工夫が必要なんですがそのままファイル名を使っている点は単なる手抜きです。
(このままだと別々の人が同名でファイルアップロードすると事故が起きる)

まとめ

ひとまず正解かどうかは置いといたとして、動かすことはできました。
まだまだWebFluxやReactorの書き方が理解できていないので勉強したいと思います。