Spring WebFlux + DynamoDB で非同期処理を書いてみる

はじめに

reactiveプログラミングって恥ずかしながらよく知らなかったのですが、去年AWS SDK for Java 2が登場したりとかSpring5.0からSpring WebFluxが追加されたりとかして認識し始めて、Javaでもそんなことができるのかと興味がわいたのでちょっと触ってみました。 慣れ親しんでいるDynamoDBと簡単に実装できそうな Spring WebFlux を使ってサンプルを作ってみました。

サンプルの内容

  • 住所.jp から取得した都道府県の住所をDynamoDBへ登録(全都道府県突っ込むのは面倒だったので実際には東京都と周辺の県のみ)
  • 使用した項目は以下の通り
    • 住所コード(プライマリコード。9桁の数字)
    • 郵便番号(ハイフン有り)
    • 都道府県コード(2桁の数字)
    • 都道府県(漢字)
    • 市区町村(漢字)
    • 町域(漢字)
    • 字丁目(町域の後に記載するやつ)
  • DynamoDBのテーブル定義は以下の通り
{
  "AttributeDefinitions": [
      {
          "AttributeName": "addressCode", 
          "AttributeType": "N"
      },
      {
          "AttributeName": "prefectureCode", 
          "AttributeType": "N"
      }
  ], 
  "TableName": "address-list", 
  "KeySchema": [
      {
          "AttributeName": "addressCode", 
          "KeyType": "HASH"
      }
  ], 
  "GlobalSecondaryIndexes": [
      {
          "IndexName": "Index-prefectureCode",
          "KeySchema": [
              {
                  "AttributeName": "prefectureCode",
                  "KeyType": "HASH"
              }
          ],
          "Projection": {
              "ProjectionType": "ALL"
          }
      }
  ],
 ・
 ・
 ・
  • 住所コードをHASHに、都道府県コードをGSIに定義
  • 登録した住所のうち都道府県コードが13(東京)を条件に住所の一覧を返却する
  • 比較のためAWS SDK for Java 1,2 で同期、非同期処理を実装
  • コードはこちらを参照いただければと思います。

構成

実装

pomにAWS SDK for Java 1,2 のを追加

  • 今回使うのはDynamoDBだけなのでBOMで追加
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>bom</artifactId>
      <version>2.5.51</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-bom</artifactId>
      <version>1.11.327</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
  ・
  ・
  ・
<dependencies>
  <dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>dynamodb</artifactId>
  </dependency>

  <dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-dynamodb</artifactId>
  </dependency>
  ・
  ・
  ・
</dependencies>

AWS SDK for Java 1 を使った同期処理

  • Accept Header でリクエストをマッピングするために、 producesMIMEタイプ application/json を指定
  • レスポンスはAddressクラスのリスト
@RestController
public class DemoController {

  @GetMapping(path = "/address/tokyo", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  public List<Address> getAddress() {
  ・
  ・
  ・
  }
  • AWS SDK for Java 1の場合、1回で返却されるレスポンスに上限があるので lastEvaluatedKey が返却されなくなるまでリクエストを投げる必要がある
    AmazonDynamoDB dbClient = AmazonDynamoDBClientBuilder.standard().build();
    Map<String, AttributeValue> lastEvaluatedKey = null;
    List<Address> responseList = new ArrayList<Address>();

    // loop to get all pages 
    do {

      // create QueryResult
      Map<String, Condition> expressionAttributeValues = new HashMap<String, Condition>();
      Condition condition = new Condition();
      condition.withComparisonOperator(ComparisonOperator.EQ).withAttributeValueList(new AttributeValue().withN("13"));
      expressionAttributeValues.put("prefectureCode", condition);
      QueryRequest queryRequest = new QueryRequest().withTableName("address-list")
          .withExclusiveStartKey(lastEvaluatedKey).withIndexName("Index-prefectureCode")
          .withKeyConditions(expressionAttributeValues);
      QueryResult queryResult = dbClient.query(queryRequest);

      queryResult.getItems().stream().forEach(result -> {
        Address i = new Address();
        i.setAddressCode(result.get("addressCode") != null ? result.get("addressCode").getN() : "");
        i.setZipCode(result.get("zipCode") != null ? result.get("zipCode").getS() : "");
        i.setPrefectureCode(result.get("prefectureCode") != null ? result.get("prefectureCode").getN() : "");
        i.setPrefectureNama(result.get("prefectureNama") != null ? result.get("prefectureNama").getS() : "");
        i.setCityName(result.get("cityName") != null ? result.get("cityName").getS() : "");
        i.setDistrictName(result.get("districtName") != null ? result.get("districtName").getS() : "");
        i.setBlockName(result.get("blockName") != null ? result.get("blockName").getS() : "");
        responseList.add(i);
      });

      // loop until lastEvaluatedKey is empty
      lastEvaluatedKey = queryResult.getLastEvaluatedKey();
    } while (lastEvaluatedKey != null);

    return responseList;

AWS SDK for Java 2 を使った非同期実装

  • こちらには producesMIMEタイプ application/stream+json を指定
  • レスポンスはAddressクラスのFlux
@RestController
public class DemoController {

  @GetMapping(path = "/address/tokyo", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
  public Flux<Address> getAddressWithStream() {
  ・
  ・
  ・
  }
  • Fluxは0個以上の値を返却するときに使用、0~1個の値を返却するときはMonoを使用する

  • DynamoDBAsyncClient を生成して queryPaginator を呼び出して QueryPublisher を取得する

    // not need to set exclusiveStartKey because of getting the next page of results automatically
    QueryPublisher queryPublisher = DynamoDbAsyncClient
                                      .create()
                                      .queryPaginator(request -> {
                                        request.tableName("address-list")
                                        .select(Select.ALL_ATTRIBUTES)
                                        .indexName("Index-prefectureCode")
                                        .keyConditionExpression("prefectureCode = :tokyo")
                                        .expressionAttributeValues(Map.of(":tokyo",
                                          software.amazon.awssdk.services.dynamodb.model.AttributeValue.builder().n("13").build()));
                                      });
  • 自動的に次のページのデータも取得してくれるので exclusiveStartKey の設定とかは不要(ちなみにAWS SDK for Java 2なら同期処理でも同様に自動で取得できるようです)
  • あとは取得される結果を少々整形して Flux を返却すれば終わり
    return Flux.from(
      queryPublisher.items().map(map -> {
        Address i = new Address();
        i.setAddressCode(map.get("addressCode") != null ? map.get("addressCode").n() : "");
        i.setZipCode(map.get("zipCode") != null ? map.get("zipCode").s() : "");
        i.setPrefectureCode(map.get("prefectureCode") != null ? map.get("prefectureCode").n() : "");
        i.setPrefectureNama(map.get("prefectureNama") != null ? map.get("prefectureNama").s() : "");
        i.setCityName(map.get("cityName") != null ? map.get("cityName").s() : "");
        i.setDistrictName(map.get("districtName") != null ? map.get("districtName").s() : "");
        i.setBlockName(map.get("blockName") != null ? map.get("blockName").s() : "");
        return i;
      })
    );

挙動

  • AWS SDK for Java 1 を使って同期処理で作った方はレスポンスがjsonのリスト形式になっています
$ curl -H "Accept: application/json" http://localhost:8080/address/tokyo
[{"addressCode":"100890900","zipCode":"100-8909","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"永田町","blockName":""},{"addressCode":"176851000","zipCode":"176-8510","prefectureCode":"13","prefectureNama":"東京都","cityName":"練馬区","districtName":"桜台","blockName":""},{"addressCode":"135002200","zipCode":"135-0022","prefectureCode":"13","prefectureNama":"東京都","cityName":"江東区","districtName":"三好","blockName":""},{"addressCode":"183001300","zipCode":"183-0013","prefectureCode":"13","prefectureNama":"東京都","cityName":"府中市","districtName":"小柳町","blockName":""},
  ・
  ・
  ・
,{"addressCode":"151867400","zipCode":"151-8674","prefectureCode":"13","prefectureNama":"東京都","cityName":"渋谷区","districtName":"代々木","blockName":""},{"addressCode":"100650700","zipCode":"100-6507","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"丸の内","blockName":"新丸の内ビルディング 7階"}]
  • AWS SDK for Java 2 使って非同期で作った方はstreamで返却されるので、jsonのListとは見た目にもレスポンス形式が違うのがわかります
$ curl -H "Accept: application/stream+json" http://localhost:8080/address/tokyo
{"addressCode":"100890900","zipCode":"100-8909","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"永田町","blockName":""}
{"addressCode":"176851000","zipCode":"176-8510","prefectureCode":"13","prefectureNama":"東京都","cityName":"練馬区","districtName":"桜台","blockName":""}
{"addressCode":"135002200","zipCode":"135-0022","prefectureCode":"13","prefectureNama":"東京都","cityName":"江東区","districtName":"三好","blockName":""}
{"addressCode":"183001300","zipCode":"183-0013","prefectureCode":"13","prefectureNama":"東京都","cityName":"府中市","districtName":"小柳町","blockName":""}
{"addressCode":"101820400","zipCode":"101-8204","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"神田練塀町","blockName":""}
{"addressCode":"100812700","zipCode":"100-8127","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"大手町","blockName":""}
  ・
  ・
  ・
{"addressCode":"151867400","zipCode":"151-8674","prefectureCode":"13","prefectureNama":"東京都","cityName":"渋谷区","districtName":"代々木","blockName":""}
{"addressCode":"100650700","zipCode":"100-6507","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"丸の内","blockName":"新丸の内ビルディング 7階"}

まとめ

ここにたどり着くまで結構時間がかかりました。。。そもそも非同期処理って何なの?とかWebFluxって何なの?とか、 AWS SDK for Java 1 だとページングされるので、AWS SDK for Java 2で非同期処理の時にはどうやって exclusiveStartKey をどうやって指定するんだ?とかとか。。 でも色々試行錯誤して多少は非同期処理の実装にも慣れたので良かったです。