hatenob

プログラムって分からないことだらけ

2021の振り返りと2022

2022年になりました。見返すと、前回の更新が2020年の2月ということで、立派に2年ほど放置していたことになります。
本ブログは自身の技術研鑽を晒す場として書いていますが、2020年も2021年も決して何もやってなかったってわけではないので、こんなことをやったというのを記しておこうと思います。

プログラミング

仕事ではJava&SpringBoot、たまにPythonくらいですが、プライベートではもう何度目かのReactの勉強をしました。本を見ながら写経する感じで一通りなぞって何となく書き方は理解できたと思いますがもう忘れています。
前回の記事で書いていたWebFluxも私の周りでは登場の機会はなく、gRPCやRSocket、GraphQLなどもまだまだ先かなという気がしているのは、どちらかというと技術的なブレイクスルーが必要となるような状況に未だないからではないかと思います。サービスの開発や運営って難しいですね。

クラウド

AWS一辺倒ではありますが、CDKでの構築とAppMeshをやってました。CDKはようやく慣れてきたところでv2が出てまだ追えてません。モジュール化をどう進めるかはまだベストプラクティスを見つけられてないのでボチボチやりたいと思います。
AppMeshは色々と検証をしたりしてました。基本機能は抑えつつも、枝葉は思ったんと違うところもありまだまだこれからという感じです。利用者が増えないと成熟もしないので、コミュニティへの参加などもしていけたらと思います。

2022年もこれまでの積み重ねを大切に、たったの一行でもコードを書いた分だけ成長できると信じて取り組みたいと思います。

WebFlux.fnとClean Architectureで実装を試みる

動機

仕事で使うあてがあるわけではないのですが気になってWebFlux.fn(WebFluxのFunctional Endpoint)を少し触ったのを機に、もう少しそれっぽい実装をしてみようと思うに至りました。
せっかくなので、Clean Architecture(クリーンアーキテクチャ)で有名なあの円形の図に倣った形で、DDD(ドメイン駆動設計)っぽい実装にも挑戦してみることにしました。
DDDとかClean Architectureは特に実装に依るものではなく設計方法論とういことは十二分に承知したうえで、字面だけを見て理解した内容を実装に落としてみることにしました。
無論、初めてだらけなので結構頻繁に迷子になりました。本当は自分なりの答えに行き着ければよかったのですが今のところ気付きを得られた程度で、まだまだ熟考が必要と感じています。
自分の悩みが誰かの気付きになればと思い、一区切りついたので記事に起こしています。
絵をかいて視覚的に分かりやすいようにしたいと思う反面、それはそれで手間がかかるのでひとまず文字で表現しています。あしからず。

参考

当然先人の知恵をたくさんお借りして進めました。DDD本、IDDD本、CleanArchitecture本などは一通り目は通していますが、主に以下を参考にさせていただきました。
実践クリーンアーキテクチャ with Java │ nrslib
クリーンアーキテクチャ完全に理解した · GitHub
世界一わかりやすいClean Architecture - nuits.jp blog
BLOG.IK.AM
https://www.youtube.com/watch?v=Nqv0JeyaZmg

対象

簡単なCRUD(と言いつつDはありません)のAPIを作ることにしました。
スタックは以下の通りです。

  • Java 11
  • SpringBoot 2.2
  • WebFlux.fn
  • R2DBC
  • H2

ソース

ソースコードは以下に置いています。いくつか免責事項(?)があります。
github.com

  • この記事をあげた後に改編する可能性があり、記事の内容と齟齬があるかもしれませんがご容赦ください。
  • しばらく時間が経って気まぐれでリポジトリを削除してしまいコードを見れなくなることがあるかもしれませんがご容赦ください。

悩み1:パッケージどう切るか問題

Javaパッケージ構成をどのようにするかですが、極力無駄な依存を生まないようにパッケージプライベートをうまく使える構成にしたいところです。
あとはClean ArchitectureやDDDとの関連が分かるようにある程度名前は合わせておきたいところです。
そんなこんなで以下のような構成にしてみました。

global // レイヤに跨るもの。Utilとか。
drivers // フレームワークの設定系。Configとか。
adapters // 外界とアプリケーションをつなぐ部分。アダプター層。
  ┗ handlers // Function Endpointのハンドラ。外界から入ってくるほう。
      ┗ XxxHandler // ハンドラクラス
      ┗ XxxPresenter // プレゼンタ
      ┗ XxxRequest // リクエスト(DTO)
      ┗ XxxResponse // レスポンス(DTO)
  ┗ gateways // 外界へ出ていくほう。
      ┗ db // DBアクセスコード。RepositoryやQueryの実装。
          ┗ XxxDatabaseClient // DBアクセスコードの実装。
          ┗ XxxEntity // ORM用クラス(DTO)
          ┗ XxxReactiveRepository // ORM用クラス
      ┗ api // API呼出しコード。RepositoryやQueryの実装。
usecases // アプリケーション層。
  ┗ XxxUseCase // ユースケースインタフェース
  ┗ XxxUseInteractor // インタラクタ(ユースケースの実装)
  ┗ XxxUseCaseInput // ユースケースの入力(DTO)
  ┗ XxxUseCaseOutput // ユースケースの出力(DTO)
domain // ドメイン層。
  ┗ model // ドメインモデル。
      ┗ Xxx // エンティティクラスや値クラス
      ┗ XxxRepository // リポジトリ(インタフェース)
queries // クエリサービス。
   ┗ XxxQuery // クエリインタフェース
   ┗ XxxQueryRequest // クエリリクエスト(DTO)
   ┗ XxxQueryResponse // クエリレスポンス(DTO)

だいたいClean Architectureの名前に倣ったけれど、ドメイン層だけは「entity」を使わずに「domain.model」にしました。Entity(エンティティ)はERモデルのエンティティとか、JPAのエンティティとか、DDDのエンティティとか、他の用語と被るので分かりにくくなりそう、というのがその理由です。
アダプター層は外界のパラメータをユースケース(アプリケーション層)が理解できる形に変換することを想定しています。
ユースケース(アプリケーション層)は、外界の表現方法に依らない処理を実装することを想定しています。つまり、インタフェースがHTTPであっても、CSVファイルの読み込みであっても、アプリケーション層の処理は分からないことを想定しています。
あと、やっぱり感覚的に更新系と参照系では異なる方式が要求される気がするので、CQRSの考えに倣ってクエリサービスは別立てしています。一応、位置づけとしてはRepositoryと同じで、ユースケースからインタフェースを介して呼び出されることを想定しています。

悩み2:presenterどうするか問題

Clean Architectureの図に倣った実装を試みるときにみなさん悩まれることのようのです。Webアプリケーションフレームワークが生のHTTPハンドラを隠蔽しており、ControllerやHandlerクラスから何かしらを戻り値としてreturnしないといけないので、「ユースケースからプレゼンターのインタフェースを読んで描画して処理終了!」みたいな実装は難しいわけです。
今回はpresenterの責務としては「ユースケース(アプリケーション層)の返す値を外界に合わせた表現に変換する」こととしたときに、一応クラスとしては用意しつつもHandlerの中で呼び出してレスポンスクラスを生成する使い方にしました。
また、特に単票画面で登録や更新をするときのように、リクエストで飛んでくる入力パラメータとレスポンスで返す出力パラメータは似たようなものになることが往々にしてあります。そこで、presenterの責務に、先述のユースケース→外界のみならず「外界の表現をユースケース(アプリケーション層)の入力に変換する」も追加し、いわゆる変換クラス(ConverterとかTranslatorみたいなものと同義)と位置付けることにしました。

悩み3:ドメインモデルクラスどこまで定義するか問題

極論的には、プログラミング言語に含まれるネイティブな型(intとかStringとかDateとか)を使わずにすべてドメインモデルとして定義すべし、みたいなのがあるようです。そうすることでドメインモデルの扱いにおいて型安全になり、カプセル化によるドメイン知識の漏洩も防げるのかなと思います。
さて、いざやろうとするとどこまで厳密に従うべきか悩みます。今回はお勉強なのでひとまず全部クラスとして定義してみました。プロパティやメソッドの引数などに曖昧さがなくなって意味がより明確になったように思うので、このくらいのアプリであれば全面採用しても問題ないように思っています。が、一人で作っている分には全部把握しているので、生産性や品質面で顕著な効果を感じられていないのが悩みどころです。より複雑になったり、時間を経て保守しないいけないとなったときには効きそうに思います。

悩み4:ドメインモデルクラスどこまで露出するか問題

Clean Architectureに倣った実装において真に重要なことは「依存の方向が外から中へ向くこと」一点だそうです。としたときにドメインモデルを、アプリケーション層を飛ばしてアダプター層にまでドメインモデルのまま見せるかどうか。
アダプタ層は入力のI/Fによって変わる部分、アプリケーション層より中は変わらない部分、と位置付けた時に、アダプタ層がアプリケーション層とドメイン層の両方を意識して実装しないといけないのは思考の負荷が高いように思いました。あくまで、アプリケーション層との境界だけを意識するほうがシンプルなのかな、と。
というわけで、原則、層を飛び越えた露出はしない方針にしました。

悩み5:バリデーションどこでどうやるか問題

一番悩んで未だにモヤモヤしているのが、入力値のバリデーションをどこで行うか?エラーをどのように返すか?というあたりです。
アダプター層のパラメータとアプリケーション層のパラメータとドメイン層のパラメータと、それぞれでチェックが必要になるわけですが、先ほどの露出の問題と同じく、多層を意識した設計はしたくないのです。
なのでドメインモデルの整合性を保つためのチェックはドメイン層に実装したいわけです。
一方でユースケースごとに異なるバリデーションはアプリケーション層に実装したいです。
外界からは基本的には「文字列」として飛んでくるパラメータをアプリケーション層に渡せる型に変換するのはアダプター層の責務としたときに、正しい型に変換できることのチェックはアダプター層で実装する必要があります。
素直に実装すると、最初にアダプター層のチェック、次にユースケース層のチェック、最後にドメイン層のチェックになります。
この時、層ごとにエラーを返すと、クライアントは何度もエラーの訂正をしないといけなくなるので、パラメータごとに内層に渡せる形であれば内層に渡してチェックをし、全パラメータの一通りのチェックを通した結果をクライアントには返したいわけです。そうすればクライアントはまとめてエラー修正をすることができます。
「パラメータごとに」ってのがポイントで、ある項目はアダプター層のチェックで引っかかったとしたらそれ以上のチェックはしない(できない)、その他はアプリケーション層でチェックをする、という実装が必要になります。
あとは、クライアントにはどのパラメータがエラーだったかを返したいわけですが、パラメータ名を知っているのはアダプター層だけなので、内層に渡ったものはあくまでそれぞれの層での属性名でしか判別できなくなります。
というのでかなり悶々として試行錯誤を繰り返し、今の実装に落ち着きました。なお、属性名から外界のパラメータ名への変換は実装できていません。
上記のような経緯なので、バリデーションもBean Validationではうまくはまりそうにないと判断して自作しています。

悩み6:例外ハンドリングをどこでどうやるか問題

パラメータ名への変換が実装できていない理由として、例外ハンドリングをどこでどうやるか、というのでこれまた悶々としたためです。
WebFlux(Reactor)を使ってMonoやFluxで実装をした場合、基本は遅延実行になるわけですが、手続き型に慣れ親しんだ身からすると「書いてあるところに来たタイミングで実行されるわけではない」という振る舞いに相当違和感を感じました。本来なら全体を「try~catch」で囲ってあげれば例外ハンドリングができるわけですが、実際に実行されるのはそのtry節の外だったりするので、「onErrorReturn」や「onErrorResume」をうまく使って補足する必要があるようです。※この辺は理解が怪しいです。観測した感じ、そんな感じでした。
先ほどの入力チェックでエラーになった時には例外を投げるようにしたわけですが、これを補足してレスポンスコード「400(Bad Request)」で返すのをどこでやるのがよいのかでこれまた悶々と試行錯誤を繰り返しました。
最初、Handlerの関数のチェーンの中でやってみたところ、Monoを返すパターンはうまく実装できましたが、Fluxを返すパターンでうまいやり方が分かりませんでした。

// Monoを返すパターン
//   mapを使ってServerResponseに渡せる
    .map(res -> ServerResponse.ok().bodyValue(res));

// Fluxを返すパターン
//    mapを使って渡せない(渡し方が分からない)
    ServerResponse.ok().body(usecase.handle(in), ResponseType.class);

なので、アダプター層より外層のフレームワーク層に位置するところで「DefaultErrorWebExceptionHandler」を継承してカスタマイズした独自例外ハンドラを実装して対処しています。
アダプター層より外層まで伝播させてしまったが故に外界からのパラメータ名が何か分からず、属性名から外界のパラメータ名への変換が実装できてないのです。

悩み7:ORMどうするか問題

今回、experimentalではありますが「spring-boot-starter-data-r2dbc」を使っています。JPARepositoryのように、インタフェースを定義するとバイトコードエンハンスメント(?)で実装が生成されるようですが、マップするクラスをドメインクラスやクエリクラスをそのまま使わず、それ専用に用意してこれまた型変換するようにしました。極力フレームワークに引きずられる実装コードを少なくしたかった、というのがその理由です。
型変換するので、ドメイン層のRepositoyにそのままReactiveRepositoryを継承させることができなくなります。なのでDBアクセス専用のリポジトリを別に定義して、Repositoryの実装にコンポジット&インジェクションするようにしました。
リポジトリに渡されるドメインモデルをマッパークラスに変換するのはアダプター層においたRepositoryの実装でやることになります。ここは、ドメインモデルがアダプター層に露出していることになりました。

悩み8:変換多すぎ問題

結果として、色んなところで変換処理が必要になりました。しかも、単純なプロパティコピーではない構造の変換を含んでいますので、いちいち手組が必要になります。これは規模が大きくなると結構大変になる系のやつです。

総括

長々と悩みの吐露にお付き合いくださった方はどうもありがとうございました。
個人的な感想として、

  • 関数型の実装を手続き型しか分からない人たちに理解してバグのない実装をしてもらうことは至難の業な気がする。
  • WebFluxで得られるスケーラビリティが本当に必要になった時に本当に必要な個所に適用するだけなら何とかできそうな気がする。
  • DDDやClean Architectureの考え方を実装に落とすにもまだまだ修行が必要。
  • 変換とか多いし、単純なCRUDのくせにやたらクラス多いし(何も考えずシンプルにやりたいことだけ実装するならこんなにクラスはいらない)で、大規模化には苦労しそうではあるけど、実装をしていて意図が明確なので見通しは良くなったと感じる。生産性は下がるかもしれないが、保守性は上がったと信じたい。

といった感じです。

引き続き研鑽したいと思います。

SpringBoot 2.2でRSocket

SpringBoot 2.2でRSocketを試してみました。

org.springframework.messaging.MessageDeliveryException: No handler for destination ''

というのが出て最初はうまく動かなかったのですが、何とか動きました。

動作確認環境

構成

ユーザからのリクエストを受け付けるRESTのエンドポイントがあり、バックエンドをRSocketで通信している風です。
clientという名前が分かりにくいですね。。

  [user]  -- http/rest --> [client] -- tcp/rsocket --> [server]

Server

ざっくりこんな感じです。

@Controller
public class GreetingRSocketController {
    @MessageMapping("hello")
    public Mono<GreetingData> hello(GreetingRequest request) {
        return Mono.just(new GreetingData("hello " + request.getWord()));
    }

    @MessageExceptionHandler
    public Mono<GreetingData> handleException(Exception e) {
        return Mono.just(GreetingData.fromException(e));
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class GreetingData {
    private String word;

    public static GreetingData fromException(Exception e) {
        return new GreetingData(e.getMessage());
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class GreetingRequest {
    private String word;
}

Client

こちらもざっくりと。
Serverと重複するGreetingDataとGreetingRequestは省略です。

@RestController
public class GreetingRestController {
    private final RSocketRequester requester;

    @Autowired
    public GreetingRestController(RSocketRequester requester) {
        this.requester = requester;
    }

    @GetMapping("/hello/{word}")
    public Publisher<GreetingData> current(@PathVariable("word") String word) {
        return requester.route("hello").data(new GreetingRequest(word)).retrieveMono(GreetingData.class);
    }
}

@Configuration
public class ClientConfiguration {
    @Bean
    public RSocket rsocket() {
        return RSocketFactory.connect()
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .transport(TcpClientTransport.create(7000))
                .start().block();
    }

    @Bean
    public RSocketRequester rsocketRequester(RSocketStrategies rsocketStrategies) {
        return RSocketRequester.wrap(rsocket(), MimeTypeUtils.APPLICATION_JSON,
                MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString()),
                rsocketStrategies);
    }
}

最初に書いた例外は、metaDataTimeTypeの設定不備によるものでした。

WebFluxでS3にファイルアップロードしてDynamoDBにインデックス作成(AWS Java SDK v2)

前回、WebFluxでS3にファイルをアップロードするところまでやりました。
そのままだと、アップロードされたファイル名のままになるので別の人が同じファイル名で別のファイルをあげた日には目も当てられない惨状となるだけでなく、同じ人でも別のファイルを同じ名前であげるとやっぱりイマイチやなぁという話だったので、「最低限」という感じでS3に保存したファイルのインデックスをDynamoDBに登録する処理を書いてみました。
例によって「お作法的にほんまにこれでおうてんのか・・動きはしたんやけども・・」という感じです。

リソース作成

CDKで作成しました。

public class ApCdkStack extends Stack {
    public ApCdkStack(final Construct scope, final String id) {
        this(scope, id, null);
    }

    public ApCdkStack(final Construct scope, final String id, final StackProps props) {
        super(scope, id, props);
        bucket("ap-cdk-bucket", "ap-cdk-bucket-1");
        dynamodb("ap-cdk-dynamodb", "ap-cdk-dynamodb-1");
    }

    private void bucket(String id, String bucketName) {
        List<LifecycleRule> lifecycleRules = new ArrayList<>();
        lifecycleRules
            .add(LifecycleRule.builder()
                .abortIncompleteMultipartUploadAfter(Duration.days(1))
                .build());

        Bucket.Builder.create(this, id)
            .bucketName(bucketName)
            .publicReadAccess(false)
            .blockPublicAccess(BlockPublicAccess.BLOCK_ALL)
            .versioned(false)
            .lifecycleRules(lifecycleRules)
            .removalPolicy(RemovalPolicy.DESTROY).build();
    }

    private void dynamodb(String id, String tableName) {
        String partitionKeyName = "fid";
        Attribute partitionKey = Attribute.builder()
            .name(partitionKeyName).type(AttributeType.STRING)
            .build();

        Table.Builder.create(this, id)
            .tableName(tableName)
            .partitionKey(partitionKey)
            .billingMode(BillingMode.PAY_PER_REQUEST)
            .removalPolicy(RemovalPolicy.DESTROY)
            .build();
    }
}

コード

コードは前回への追記部分だけです。

    private final FileRepository repo;
    ...
    private Mono<UploadResponse> putObject(final FilePart filePart) {
        final FileInfo info = FileInfoBuilder.create(filePart).build();
        final PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(info.getFid()).build();
        ...
        return Mono.fromFuture(s3.putObject(req, body))
            .doOnError(e -> log.error(e.getMessage(), e))
            .map(res -> new UploadResponse(req.key(), res.versionId()))
            .doOnSuccess(res -> repo.save(info));

前回はUploadResponseを返して終わりだったのですが、doOnSuccessでFileRepositoryのsaveでファイル情報を保存しています。
引数は、FileInfoBuilderで作ったFileInfoにしていますが、これがDynamoDBに登録する項目(Item)になるものです。

@Value
public class FileInfo {
    private String fid;
    private String fname;
}

public class FileInfoBuilder {
    private final FilePart part;

    private FileInfoBuilder(FilePart part) {
        this.part = part;
    }

    public static FileInfoBuilder create(FilePart part) {
        return new FileInfoBuilder(part);
    }

    public FileInfo build() {
        return new FileInfo(fid(), part.filename());
    }

    private String fid() {
        return UUID.randomUUID().toString();
    }
}

fid(ファイルID)は雑ですがUUIDにしています。このまま保存すると拡張子もないのでS3バケットを見ただけではいったい何かすら分からないのですが人が見るわけじゃないという想定なのでまぁいいかなと思います。
S3は裏側の仕組みとしてキーのプリフィックス先頭7文字くらいのハッシュでパーティション化(シャード化?)するらしいので、均等になることを願ってランダムな文字列にしています。ただここも本当は認証されたユーザが使う前提であれば「ユーザID/一意なファイル名」というキーのほうがよいのだろうなとは思います。

@Repository
@Slf4j
public class DynamodbFileRepository implements FileRepository {
    private final DynamoDbAsyncClient db;
    private final FileInfoDynamodbMapper mapper;

    private static final String TABLE = "ap-cdk-dynamodb-1";

    @Autowired
    public DynamodbFileRepository() {
        this.db = DynamoDbAsyncClient.builder()
            .region(Region.AP_NORTHEAST_1)
            .build();
        this.mapper = new FileInfoDynamodbMapper();
    }

    @Override
    public Mono<Void> save(FileInfo info) {
        PutItemRequest req = PutItemRequest.builder()
            .tableName(TABLE)
            .item(mapper.to(info))
            .build();

        return Mono.fromFuture(db.putItem(req))
            .doOnError(e -> log.error(e.getMessage(), e))
            .log()
            .then();
    }
}

public class FileInfoDynamodbMapper {
    public Map<String, AttributeValue> to(FileInfo from) {
        Map<String, AttributeValue> o = new HashMap<>();
        o.put("fid", AttributeValue.builder().s(from.getFid()).build());
        o.put("fname", AttributeValue.builder().s(from.getFname()).build());
        return o;
    }

    public FileInfo to(GetItemResponse from) {
        return to(from.item());
    }

    public FileInfo to(Map<String, AttributeValue> from) {
        return new FileInfo(from.get("fid").s(), from.get("fname").s());
    }
}

DynamoDBへの登録もS3と同じく非同期クラインとでReactorを使って実装してみましたが、ますます「これでええんやろか・・」という思いは増すばかりです。S3の時もなのですが例外ハンドリングの書き方が全然分かっておりませんで、スループット上限に達した時にリトライしたいんだけどもどうすればよいんやろうか、、。

まとめ

とりあえずお作法的に正しいかや、例外ハンドリングの書き方などなど、まだまだよく分からないことだらけではあるものの、ひとまずちゃんと動くところまで確認できたのでよしとします。
あとはこれを洗練させれば、ある程度汎用的なS3アップローダとして使えるかなぁという感じです。

オリジナルコードはここですが、色々と書き換えているので記事執筆時点と違っているかもしれません。
garden/ap-flux-s3 at master · nobrooklyn/garden · GitHub

WebFluxでS3にファイルアップロード(AWS Java SDK v2)

以前、WebFluxで複数ファイルアップロードしてサーバサイドに保存する方法を試したのだけれど、今回はそれをS3にアップロードにしてみました。
AWS側はJava SDK v2を使います。
v2になってノンブロッキングI/Oがサポートされたそうなので、WebFluxを使った場合でも問題なく使えるはず、ということです。

docs.aws.amazon.com

最初に断っておきますが、以下に記載するコードについては、正解が分からないまま「なんとなくこんな感じかなぁ」で書いて「とりあえず動いた」というもので、書き方として正しいか、本当にブロック処理入ってないか、とか全然分かってませんというのが大前提です。
正解が分かる方は是非教えていただきたいです。

pom.xml

WebFluxとAWS SDK v2を依存関係に追加します。

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.1.RELEASE</version>
</parent>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>bom</artifactId>
      <version>2.10.25</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>s3</artifactId>
    </dependency>
  </dependencies>

Handler

アップロードの主処理です。

@Component
@Slf4j
public class UploadHandler {
    private final S3AsyncClient s3;
    private static final String bucket = "ap-cdk-bucket-1";
    private static final String key = "app/test";

    @Autowired
    public UploadHandler() {
        this.s3 = S3AsyncClient.builder().region(Region.AP_NORTHEAST_1).build();
    }

    public RouterFunction<ServerResponse> route() {
        return RouterFunctions.route().path("/upload", u -> {
            u.POST("/", this::upload);
        }).build();
    }

    public Mono<ServerResponse> upload(final ServerRequest req) {
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM)
                .body(
                    req.multipartData().map(d -> d.get("f"))
                        .flatMapMany(Flux::fromIterable)
                        .cast(FilePart.class)
                        .flatMap(f -> putObject(f)).log(), UploadResponse.class);
    }

    private Mono<UploadResponse> putObject(final FilePart filePart) {
        final PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key + "/" + filePart.filename())
                .build();

        Path localPath = Paths.get(System.getProperty("java.io.tmpdir"), filePart.filename());
        filePart.transferTo(localPath);

        final AsyncRequestBody body = AsyncRequestBody.fromFile(localPath);

        return Mono.fromFuture(s3.putObject(req, body))
            .doOnError(e -> log.error(e.getMessage(), e))
            .map(res -> new UploadResponse(req.bucket() + "/" + req.key(), res.versionId()));
    }
}

@Value
@JsonDeserialize
public class UploadResponse {
    private String path;
    private String version;
}

@Component
public class RouteConfig {
    @Bean
    public RouterFunction<ServerResponse> routes(UploadHandler uploadHandler) {
        return uploadHandler.route();
    }
}

見ての通りで、transferToで一度ローカルのtmpdirに保存してから非同期でアップロードしています。
直接bytebufferとかに変換してアップロードできないかなぁと試行錯誤してみたものの、イマイチ良い方法が見つからず諦めました。
ちなみにローカルへの保存やS3へのアップロード時に本当は一意な名前になるように工夫が必要なんですがそのままファイル名を使っている点は単なる手抜きです。
(このままだと別々の人が同名でファイルアップロードすると事故が起きる)

まとめ

ひとまず正解かどうかは置いといたとして、動かすことはできました。
まだまだWebFluxやReactorの書き方が理解できていないので勉強したいと思います。

AWS CDKのJava版がGAになったようなので触ってみる

タイトルの通りです。

aws.amazon.com

CloudFormationでJSONYAMLで書けばよいんですが、プログラム言語で書くことで条件分岐やループが使えるのと、IDEのサポートが受けられるあたりがメリットなんだと思います。
それだけならわざわざJavaで書かなくてもTypeScriptとかでよいような気がしますが、Javaで育ったのでJavaのほうが慣れているとか、周辺の環境的な事情や制約によってJavaのほうが嬉しい人も一定数いるんだと思います。僕みたいに。

何をするでもないのですが、とりあえずS3バケットを1つ作るコードを書いてみました。
書いただけでデプロイまではやってません。

ひな形を作ります。

> cdk init ap-cdk --language java

appかsample-appかを選びますが、空のプロジェクトならappでよいようです。
これでMavenプロジェクトが自動生成されます。

あとは生成されたクラスにS3バケットを作成するコードを書きます。

public class ApCdkStack extends Stack {
    public ApCdkStack(final Construct scope, final String id) {
        this(scope, id, null);
    }

    public ApCdkStack(final Construct scope, final String id, final StackProps props) {
        super(scope, id, props);

        Bucket.Builder.create(this, "ap-cdk-bucket").bucketName("ap-cdk-bucket-1").publicReadAccess(false)
                .versioned(true).build();
    }
}

作成したBucketオブジェクトを特に変数に入れるわけでもなく作りっぱなしみたいな感じですがこれでOKのようです。
中身は見ていませんが、Constructの変数にオブジェクトの参照を追加していってる感じなんでしょうかね。

とりあえずこれでsynthesizeしてみるとCloudFormation Templateの形式で出力されました。

> cdk synth
Resources:
  apcdkbucket04A81C10:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: ap-cdk-bucket-1
      VersioningConfiguration:
        Status: Enabled
    UpdateReplacePolicy: Retain
    DeletionPolicy: Retain
    Metadata:
      aws:cdk:path: ap-cdk/ap-cdk-bucket/Resource
  CDKMetadata:
    Type: AWS::CDK::Metadata
    Properties:
      Modules: aws-cdk=1.18.0,@aws-cdk/aws-events=1.18.0,@aws-cdk/aws-iam=1.18.0,@aws-cdk/aws-kms=1.18.0,@aws-cdk/aws-s3=1.18.0,@aws-cdk/core=1.18.0,@aws-cdk/cx-api=1.18.0,@aws-cdk/region-info=1.18.0,jsii-runtime=Java/11.0.5
    Condition: CDKMetadataAvailable
Conditions:
....

あとはこれをdeployすればよいみたいです。

ところで気になったのが、この場合のテストコードは何をテストするんだろうか?そもそもいるのだろうか?
宣言的に記述しているので、JSON/YAMLと同じく、ただ書いていくだけなら特にいらないんだろうなと思います。
まぁ、名称や設定が意図した通りになっているかをテストしてもよいんでしょうけど、労力に比して品質への寄与は少ないように思います。
条件分岐やループなんかを入れたときにはテストを書く、くらいでよいのかなと思います。

Spring WebFluxで複数ファイルアップロード

特に必要に迫られたわけでもないのですが、WebFluxでリアクティブプログラミングにも触れておいたほうがよいのかなぁという気になりました。
浅い理解では、従来の1リクエスト1スレッドから、複数リクエスト1スレッドで処理できるのでスケーラビリティが向上するものだと思っています。言い換えれば、レスポンスやスループットが向上するというものではないということでいいんだと思います。ただし、ストリームでやり取りすることで終わったものから結果を受け取れて応答性はあがるんですかね。

ちょうど、ファイルをアップロードしてあれこれしたい要件があったのでファイルアップロードを対象に調べてみました。
コードは以下にあります。(気まぐれで消すかもしれないのでリンク切れていたらすいません)

https://github.com/nobrooklyn/garden/tree/master/ap-flux-fileup

複数ファイルをアップロードするのでFluxで受け取り、順番に保存して、ファイル情報をレスポンスとして返しています。
Non-Blockingで実装しないといけないんだろうなぁということは分かっていてもお作法が分からないのでこれで良いのかはわかりませんが、動きはしました。

@RestController
@Slf4j
public class FileController {
    private final FileConfiguration conf;

    @Autowired
    public FileController(FileConfiguration conf) {
        this.conf = conf;
    }

    @ResponseBody
    @PostMapping(
        path = "/upload",
        consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
        produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<FileUploadResponse> upload(@RequestPart("f") Flux<FilePart> parts) {
        return parts.flatMap(part -> save(part));
    }

    private Mono<FileUploadResponse> save(FilePart part) {
        String fileName = part.filename();
        File file = new File("tmp/" + fileName);
        log.info("save to {}", file.getAbsolutePath());
        return part.transferTo(file)
            .thenReturn(new FileUploadResponse(file, conf.getDownloadEndpoint()));
    }
}

@Getter
@ToString
@Slf4j
public class FileUploadResponse {
    private String fileName;
    private String downloadUrl;
    private String contetnType;
    private long size;
    private String md5;

    FileUploadResponse(File file, String downloadEndpoint) {
        this.fileName = file.getName();
        this.downloadUrl = downloadEndpoint + "/" + fileName;
        try {
            this.contetnType = Files.probeContentType(file.toPath());
        } catch (IOException e) {
            this.contetnType = "no data";
        }
        this.size = file.length();
        try {
            this.md5 = DigestUtils.md5DigestAsHex(new FileInputStream(file));
        } catch (IOException e) {
            this.md5 = "no data";
        }
    }
}

@Component
public class FileConfiguration {
    final static String downloadPath = "/download";

    @Value("${file.download.url:http://localhost:8080}")
    private String downloadUrl;

    String getDownloadEndpoint() {
        return downloadUrl + downloadPath;
    }
}

Servlet APIやその振る舞いに慣れている身としては「この場合やどうやって書くんや?」というのがちょいちょいあって慣れるまでもどかしい感じです。
少しずつ勉強していきたいと思います。