Spring WebFlux + DynamoDB で非同期処理を書いてみる
はじめに
reactiveプログラミングって恥ずかしながらよく知らなかったのですが、去年AWS SDK for Java 2が登場したりとかSpring5.0からSpring WebFluxが追加されたりとかして認識し始めて、Javaでもそんなことができるのかと興味がわいたのでちょっと触ってみました。 慣れ親しんでいるDynamoDBと簡単に実装できそうな Spring WebFlux を使ってサンプルを作ってみました。
サンプルの内容
- 住所.jp から取得した都道府県の住所をDynamoDBへ登録(全都道府県突っ込むのは面倒だったので実際には東京都と周辺の県のみ)
- 使用した項目は以下の通り
- DynamoDBのテーブル定義は以下の通り
{ "AttributeDefinitions": [ { "AttributeName": "addressCode", "AttributeType": "N" }, { "AttributeName": "prefectureCode", "AttributeType": "N" } ], "TableName": "address-list", "KeySchema": [ { "AttributeName": "addressCode", "KeyType": "HASH" } ], "GlobalSecondaryIndexes": [ { "IndexName": "Index-prefectureCode", "KeySchema": [ { "AttributeName": "prefectureCode", "KeyType": "HASH" } ], "Projection": { "ProjectionType": "ALL" } } ], ・ ・ ・
- 住所コードをHASHに、都道府県コードをGSIに定義
- 登録した住所のうち都道府県コードが13(東京)を条件に住所の一覧を返却する
- 比較のためAWS SDK for Java 1,2 で同期、非同期処理を実装
- コードはこちらを参照いただければと思います。
構成
実装
pomにAWS SDK for Java 1,2 のを追加
- 今回使うのはDynamoDBだけなのでBOMで追加
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.5.51</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.11.327</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> ・ ・ ・ <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>dynamodb</artifactId> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-dynamodb</artifactId> </dependency> ・ ・ ・ </dependencies>
AWS SDK for Java 1 を使った同期処理
@RestController public class DemoController { @GetMapping(path = "/address/tokyo", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public List<Address> getAddress() { ・ ・ ・ }
AmazonDynamoDB dbClient = AmazonDynamoDBClientBuilder.standard().build(); Map<String, AttributeValue> lastEvaluatedKey = null; List<Address> responseList = new ArrayList<Address>(); // loop to get all pages do { // create QueryResult Map<String, Condition> expressionAttributeValues = new HashMap<String, Condition>(); Condition condition = new Condition(); condition.withComparisonOperator(ComparisonOperator.EQ).withAttributeValueList(new AttributeValue().withN("13")); expressionAttributeValues.put("prefectureCode", condition); QueryRequest queryRequest = new QueryRequest().withTableName("address-list") .withExclusiveStartKey(lastEvaluatedKey).withIndexName("Index-prefectureCode") .withKeyConditions(expressionAttributeValues); QueryResult queryResult = dbClient.query(queryRequest); queryResult.getItems().stream().forEach(result -> { Address i = new Address(); i.setAddressCode(result.get("addressCode") != null ? result.get("addressCode").getN() : ""); i.setZipCode(result.get("zipCode") != null ? result.get("zipCode").getS() : ""); i.setPrefectureCode(result.get("prefectureCode") != null ? result.get("prefectureCode").getN() : ""); i.setPrefectureNama(result.get("prefectureNama") != null ? result.get("prefectureNama").getS() : ""); i.setCityName(result.get("cityName") != null ? result.get("cityName").getS() : ""); i.setDistrictName(result.get("districtName") != null ? result.get("districtName").getS() : ""); i.setBlockName(result.get("blockName") != null ? result.get("blockName").getS() : ""); responseList.add(i); }); // loop until lastEvaluatedKey is empty lastEvaluatedKey = queryResult.getLastEvaluatedKey(); } while (lastEvaluatedKey != null); return responseList;
AWS SDK for Java 2 を使った非同期実装
- こちらには
produces
にMIMEタイプapplication/stream+json
を指定 - レスポンスはAddressクラスのFlux
@RestController public class DemoController { @GetMapping(path = "/address/tokyo", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Address> getAddressWithStream() { ・ ・ ・ }
Fluxは0個以上の値を返却するときに使用、0~1個の値を返却するときはMonoを使用する
DynamoDBAsyncClient
を生成してqueryPaginator
を呼び出してQueryPublisher
を取得する
// not need to set exclusiveStartKey because of getting the next page of results automatically QueryPublisher queryPublisher = DynamoDbAsyncClient .create() .queryPaginator(request -> { request.tableName("address-list") .select(Select.ALL_ATTRIBUTES) .indexName("Index-prefectureCode") .keyConditionExpression("prefectureCode = :tokyo") .expressionAttributeValues(Map.of(":tokyo", software.amazon.awssdk.services.dynamodb.model.AttributeValue.builder().n("13").build())); });
- 自動的に次のページのデータも取得してくれるので
exclusiveStartKey
の設定とかは不要(ちなみにAWS SDK for Java 2なら同期処理でも同様に自動で取得できるようです) - あとは取得される結果を少々整形して
Flux
を返却すれば終わり
return Flux.from( queryPublisher.items().map(map -> { Address i = new Address(); i.setAddressCode(map.get("addressCode") != null ? map.get("addressCode").n() : ""); i.setZipCode(map.get("zipCode") != null ? map.get("zipCode").s() : ""); i.setPrefectureCode(map.get("prefectureCode") != null ? map.get("prefectureCode").n() : ""); i.setPrefectureNama(map.get("prefectureNama") != null ? map.get("prefectureNama").s() : ""); i.setCityName(map.get("cityName") != null ? map.get("cityName").s() : ""); i.setDistrictName(map.get("districtName") != null ? map.get("districtName").s() : ""); i.setBlockName(map.get("blockName") != null ? map.get("blockName").s() : ""); return i; }) );
挙動
$ curl -H "Accept: application/json" http://localhost:8080/address/tokyo [{"addressCode":"100890900","zipCode":"100-8909","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"永田町","blockName":""},{"addressCode":"176851000","zipCode":"176-8510","prefectureCode":"13","prefectureNama":"東京都","cityName":"練馬区","districtName":"桜台","blockName":""},{"addressCode":"135002200","zipCode":"135-0022","prefectureCode":"13","prefectureNama":"東京都","cityName":"江東区","districtName":"三好","blockName":""},{"addressCode":"183001300","zipCode":"183-0013","prefectureCode":"13","prefectureNama":"東京都","cityName":"府中市","districtName":"小柳町","blockName":""}, ・ ・ ・ ,{"addressCode":"151867400","zipCode":"151-8674","prefectureCode":"13","prefectureNama":"東京都","cityName":"渋谷区","districtName":"代々木","blockName":""},{"addressCode":"100650700","zipCode":"100-6507","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"丸の内","blockName":"新丸の内ビルディング 7階"}]
$ curl -H "Accept: application/stream+json" http://localhost:8080/address/tokyo {"addressCode":"100890900","zipCode":"100-8909","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"永田町","blockName":""} {"addressCode":"176851000","zipCode":"176-8510","prefectureCode":"13","prefectureNama":"東京都","cityName":"練馬区","districtName":"桜台","blockName":""} {"addressCode":"135002200","zipCode":"135-0022","prefectureCode":"13","prefectureNama":"東京都","cityName":"江東区","districtName":"三好","blockName":""} {"addressCode":"183001300","zipCode":"183-0013","prefectureCode":"13","prefectureNama":"東京都","cityName":"府中市","districtName":"小柳町","blockName":""} {"addressCode":"101820400","zipCode":"101-8204","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"神田練塀町","blockName":""} {"addressCode":"100812700","zipCode":"100-8127","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"大手町","blockName":""} ・ ・ ・ {"addressCode":"151867400","zipCode":"151-8674","prefectureCode":"13","prefectureNama":"東京都","cityName":"渋谷区","districtName":"代々木","blockName":""} {"addressCode":"100650700","zipCode":"100-6507","prefectureCode":"13","prefectureNama":"東京都","cityName":"千代田区","districtName":"丸の内","blockName":"新丸の内ビルディング 7階"}
まとめ
ここにたどり着くまで結構時間がかかりました。。。そもそも非同期処理って何なの?とかWebFluxって何なの?とか、
AWS SDK for Java 1 だとページングされるので、AWS SDK for Java 2で非同期処理の時にはどうやって exclusiveStartKey
をどうやって指定するんだ?とかとか。。
でも色々試行錯誤して多少は非同期処理の実装にも慣れたので良かったです。