hatenob

プログラムって分からないことだらけ

WebFluxでS3にファイルアップロードしてDynamoDBにインデックス作成(AWS Java SDK v2)

前回、WebFluxでS3にファイルをアップロードするところまでやりました。
そのままだと、アップロードされたファイル名のままになるので別の人が同じファイル名で別のファイルをあげた日には目も当てられない惨状となるだけでなく、同じ人でも別のファイルを同じ名前であげるとやっぱりイマイチやなぁという話だったので、「最低限」という感じでS3に保存したファイルのインデックスをDynamoDBに登録する処理を書いてみました。
例によって「お作法的にほんまにこれでおうてんのか・・動きはしたんやけども・・」という感じです。

リソース作成

CDKで作成しました。

public class ApCdkStack extends Stack {
    public ApCdkStack(final Construct scope, final String id) {
        this(scope, id, null);
    }

    public ApCdkStack(final Construct scope, final String id, final StackProps props) {
        super(scope, id, props);
        bucket("ap-cdk-bucket", "ap-cdk-bucket-1");
        dynamodb("ap-cdk-dynamodb", "ap-cdk-dynamodb-1");
    }

    private void bucket(String id, String bucketName) {
        List<LifecycleRule> lifecycleRules = new ArrayList<>();
        lifecycleRules
            .add(LifecycleRule.builder()
                .abortIncompleteMultipartUploadAfter(Duration.days(1))
                .build());

        Bucket.Builder.create(this, id)
            .bucketName(bucketName)
            .publicReadAccess(false)
            .blockPublicAccess(BlockPublicAccess.BLOCK_ALL)
            .versioned(false)
            .lifecycleRules(lifecycleRules)
            .removalPolicy(RemovalPolicy.DESTROY).build();
    }

    private void dynamodb(String id, String tableName) {
        String partitionKeyName = "fid";
        Attribute partitionKey = Attribute.builder()
            .name(partitionKeyName).type(AttributeType.STRING)
            .build();

        Table.Builder.create(this, id)
            .tableName(tableName)
            .partitionKey(partitionKey)
            .billingMode(BillingMode.PAY_PER_REQUEST)
            .removalPolicy(RemovalPolicy.DESTROY)
            .build();
    }
}

コード

コードは前回への追記部分だけです。

    private final FileRepository repo;
    ...
    private Mono<UploadResponse> putObject(final FilePart filePart) {
        final FileInfo info = FileInfoBuilder.create(filePart).build();
        final PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(info.getFid()).build();
        ...
        return Mono.fromFuture(s3.putObject(req, body))
            .doOnError(e -> log.error(e.getMessage(), e))
            .map(res -> new UploadResponse(req.key(), res.versionId()))
            .doOnSuccess(res -> repo.save(info));

前回はUploadResponseを返して終わりだったのですが、doOnSuccessでFileRepositoryのsaveでファイル情報を保存しています。
引数は、FileInfoBuilderで作ったFileInfoにしていますが、これがDynamoDBに登録する項目(Item)になるものです。

@Value
public class FileInfo {
    private String fid;
    private String fname;
}

public class FileInfoBuilder {
    private final FilePart part;

    private FileInfoBuilder(FilePart part) {
        this.part = part;
    }

    public static FileInfoBuilder create(FilePart part) {
        return new FileInfoBuilder(part);
    }

    public FileInfo build() {
        return new FileInfo(fid(), part.filename());
    }

    private String fid() {
        return UUID.randomUUID().toString();
    }
}

fid(ファイルID)は雑ですがUUIDにしています。このまま保存すると拡張子もないのでS3バケットを見ただけではいったい何かすら分からないのですが人が見るわけじゃないという想定なのでまぁいいかなと思います。
S3は裏側の仕組みとしてキーのプリフィックス先頭7文字くらいのハッシュでパーティション化(シャード化?)するらしいので、均等になることを願ってランダムな文字列にしています。ただここも本当は認証されたユーザが使う前提であれば「ユーザID/一意なファイル名」というキーのほうがよいのだろうなとは思います。

@Repository
@Slf4j
public class DynamodbFileRepository implements FileRepository {
    private final DynamoDbAsyncClient db;
    private final FileInfoDynamodbMapper mapper;

    private static final String TABLE = "ap-cdk-dynamodb-1";

    @Autowired
    public DynamodbFileRepository() {
        this.db = DynamoDbAsyncClient.builder()
            .region(Region.AP_NORTHEAST_1)
            .build();
        this.mapper = new FileInfoDynamodbMapper();
    }

    @Override
    public Mono<Void> save(FileInfo info) {
        PutItemRequest req = PutItemRequest.builder()
            .tableName(TABLE)
            .item(mapper.to(info))
            .build();

        return Mono.fromFuture(db.putItem(req))
            .doOnError(e -> log.error(e.getMessage(), e))
            .log()
            .then();
    }
}

public class FileInfoDynamodbMapper {
    public Map<String, AttributeValue> to(FileInfo from) {
        Map<String, AttributeValue> o = new HashMap<>();
        o.put("fid", AttributeValue.builder().s(from.getFid()).build());
        o.put("fname", AttributeValue.builder().s(from.getFname()).build());
        return o;
    }

    public FileInfo to(GetItemResponse from) {
        return to(from.item());
    }

    public FileInfo to(Map<String, AttributeValue> from) {
        return new FileInfo(from.get("fid").s(), from.get("fname").s());
    }
}

DynamoDBへの登録もS3と同じく非同期クラインとでReactorを使って実装してみましたが、ますます「これでええんやろか・・」という思いは増すばかりです。S3の時もなのですが例外ハンドリングの書き方が全然分かっておりませんで、スループット上限に達した時にリトライしたいんだけどもどうすればよいんやろうか、、。

まとめ

とりあえずお作法的に正しいかや、例外ハンドリングの書き方などなど、まだまだよく分からないことだらけではあるものの、ひとまずちゃんと動くところまで確認できたのでよしとします。
あとはこれを洗練させれば、ある程度汎用的なS3アップローダとして使えるかなぁという感じです。

オリジナルコードはここですが、色々と書き換えているので記事執筆時点と違っているかもしれません。
garden/ap-flux-s3 at master · nobrooklyn/garden · GitHub