Backend Development
Vert.x DynamoDb Client
Introduction
This post explores writing a Vert.x DynamoDb Client to leverage asynchronous and non-blocking I/O and programming style. The Java language and core library support for asynchronous programming models have traditionally been limited. Although asynchronous non-blocking I/O support was introduced in Java 4 and most recently updated in Java 7, it has taken several generations for the improvements to gain widespread adoption. Java 8 recently introduced other constructs to aid this style of programming such as the promise-esque CompletionStage
, but there were still gaps in support for asynchronous models. The software engineering community has stepped up to fill these gaps and expand functionality with several open source offerings. Such options include the frameworks RxJava, Play, Akka and Vert.x, to name a few.
Leveraging these frameworks often requires building more than you would anticipate. Existing service clients and SDKs are often built with blocking operations or their own thread pools. One prominent example of this is the Amazon AWS SDK. It for example provides both a synchronous and asynchronous interface using an internal thread pool executor for DynamoDB (see: Asynchronous Programming with the AWS SDK for Java).
The problem with using additional thread pools is that it somewhat defeats the efficiency gains from building around an asynchronous framework. In some cases, such as with SQL databases, it is a trade-off against the complexity of building an asynchronous client. However, in others such as the AWS SDK, the lack of a true asynchronous implementation is more likely due to the lack of a single way to implement it than anything else -- or put another way, the lack of language and core library support for the necessary concepts.
Consequently, this situation frequently necessitates writing your own libraries to integrate the service or resource with your framework of choice. In this post I will describe how we tackled this problem to integrate Amazon’s DynamoDb with Vert.x.
Interface
The Amazon Web Service SDK provides interfaces for both a blocking (see: AmazonDynamoDB.java) and non-blocking (see: AmazonDynamoDBAsync.java) interactions with DynamoDb. The non-blocking interface defines each operation in two ways:
java.util.concurrent.Future<GetItemResult> getItemAsync( GetItemRequest getItemRequest); java.util.concurrent.Future<GetItemResult> getItemAsync( GetItemRequest getItemRequest, AsyncHandler<GetItemRequest, GetItemResult> asyncHandler);
In order to provide a Vert.x compatible client the type of Future needs to be changed from java.util.concurrent.Future
to io.vertx.core.Future
. Then we remove the second variant since the Vert.x Future includes support for handlers.
Implementation
The existing asynchronous implementation (see: AmazonDynamoDBAsyncClient.java) uses an ExecutorService for execution and Apache Http Client for dispatch (see: AmazonHttpClient.java). Instead our implementation uses the Vert.x framework for execution and the Vert.x Http Client for dispatch. However, we continue to leverage the same request and result types as well as the marshalling and unmarshalling code included in the AWS SDK. For example:
@Override public Future<ListTablesResult> listTablesAsync(final ListTablesRequest listTablesRequest) { final Request<ListTablesRequest> request = new ListTablesRequestMarshaller(PROTOCOL_FACTORY) .marshall(listTablesRequest); final HttpResponseHandler<AmazonWebServiceResponse<ListTablesResult>> responseHandler = PROTOCOL_FACTORY.createResponseHandler( new JsonOperationMetadata() .withPayloadJson(true) .withHasStreamingSuccessResponse(false), new ListTablesResultJsonUnmarshaller()); return invoke(request, responseHandler); }
Invocation
Up to this point the changes are mechanical and the code is standard boilerplate, very similar to what is already in the AWS SDK. The implementation of invoke
does all of the heavy lifting: request signing, header generation, path and parameter generation and finally request dispatch and response handling. In the AWS SDK the equivalent code is independent of the AWS service. Similarly, this code should also work with most AWS services without major modifications.
Signing
To leverage as much code from the AWS SDK as possible we wait until just before dispatch to map the AWS SDK request object to Vert.x. This simplifies several operations such as signing:
final AWSCredentials awsCredentials = awsCredentialsProvider.getCredentials(); signer.sign(request, awsCredentials);
Header Mapping
However, some operations are buried deeper in the SDK and must be reproduced. Headers are obtained from three sources: 1) Request; 2) AmazonWebServiceRequest; 3) ClientConfiguration. Additional headers are explicitly added to all requests such as amz-sdk-invocation-id
and User-Agent
.
/* package private */ static void addHeadersToRequest( final Request<?> request, final AmazonWebServiceRequest originalRequest, final ClientConfiguration clientConfiguration) { // Set the request id request.addHeader( AmazonHttpClient.HEADER_SDK_TRANSACTION_ID, UUID.randomUUID().toString()); // Set user agent final RequestClientOptions customClientOptions = originalRequest.getRequestClientOptions(); if (customClientOptions != null) { request.addHeader( AmazonHttpClient.HEADER_USER_AGENT, RuntimeHttpUtils.getUserAgent( clientConfiguration, customClientOptions.getClientMarker( RequestClientOptions.Marker.USER_AGENT))); } else { request.addHeader( AmazonHttpClient.HEADER_USER_AGENT, RuntimeHttpUtils.getUserAgent( clientConfiguration, null)); } // Add headers request.getHeaders().putAll(clientConfiguration.getHeaders()); final Map<String, String> customHeaders = originalRequest.getCustomRequestHeaders(); if (customHeaders != null) { request.getHeaders().putAll(customHeaders); } }
URI Construction
One difference in the request is that Apache defines the endpoint and path separately from the query parameters, whereas in Vert.x the endpoint is separate while the path and parameters are combined. Our strategy is to construct a java.net.URI
instance from the AWS SDK request and then map that the Vert.x request. This code again leverages existing AWS SDK utilities to help construct the path, merge parameters, and encode parameters.
/* package private */ static URI createRequestUri( final Request<?> request, final AmazonWebServiceRequest originalRequest) { // Build the request uri escaping any double slashes from the resource path final StringBuilder uriString = new StringBuilder( SdkHttpUtils.appendUri( request.getEndpoint().toString(), request.getResourcePath(), true)); // Add query parameters final Map<String, List<String>> customQueryParameters = originalRequest.getCustomQueryParameters(); if (customQueryParameters != null) { for (final Map.Entry<String, List<String>> entry : customQueryParameters.entrySet()) { request.getParameters().put( entry.getKey(), CollectionUtils.mergeLists( request.getParameters().get(entry.getKey()), entry.getValue() )); } } final String encodedQueryParameters = SdkHttpUtils.encodeParameters(request); if (encodedQueryParameters != null) { uriString.append("?").append(encodedQueryParameters); } return URI.create(uriString.toString()); }
Request Dispatch
Overall, building the dispatch process was surprisingly simple. The only caveat was the mapping of the payload. Vert.x HTTP requests accept a byte[]
, String
or io.vertx.core.buffer.Buffer
while the AWS SDK provides a java.io.InputStream
. Ideally, the stream would be wrapped in a Buffer
implementation; however, for expediency we consume the stream into a String
and wrap that in a Buffer
. In our next release, we plan to avoid this unnecessary materialization.
// Build the Vert.x http client request final HttpMethod httpMethod = translateToVertxHttpMethod(request.getHttpMethod()); final HttpClientRequest httpClientRequest = httpClient.request( httpMethod, uri.getPort(), uri.getHost(), uri.getPath() + uri.getQuery()); for (final Map.Entry<String, String> header : request.getHeaders().entrySet()) { httpClientRequest.putHeader(header.getKey(), header.getValue()); } // Setup the handler and response future final Future<U> futureResponse = Future.future(); httpClientRequest.handler( httpClientResponse -> httpClientResponse.bodyHandler( new ResponseBodyHandler<T, U>( request, httpClientResponse, futureResponse, responseHandler, httpClientSettings))); // End the request if (request.getContent() != null) { try { // TODO: Avoid materializing the InputStream httpClientRequest.end( CharStreams.toString( new InputStreamReader( request.getContent(), Charsets.UTF_8))); } catch (final IOException e) { futureResponse.fail(e); } } else { httpClientRequest.end(); } // Return the future response return futureResponse;
Response Handling
The response handling relies on existing functionality in the AWS SDK for several features: 1) Checksum validation; 2) Response unmarshalling; 3) Error unmarshalling. One notable part of this code is that the checksum is computed as a stream around the body and consumed via an attribute on the context. If this attribute is not present on the context but the response contains a checksum, the request will fail. The result is an unfortunate coupling to the HttpClientContext
class in the Apache Http Client. This was part of the price we pay for reusing parts of the AWS SDK in ways its authors likely did not intend.
@Override public void handle(@Nullable final Buffer bodyBuffer) { try { final Map<String, Object> attributes; final InputStream bodyStream; if (bodyBuffer != null && bodyBuffer.length() > 0) { bodyStream = new CRC32ChecksumCalculatingInputStream( new ByteArrayInputStream( bodyBuffer.getBytes())); attributes = ImmutableMap.of(CRC32ChecksumCalculatingInputStream.class.getName(), bodyStream); } else { bodyStream = null; attributes = ImmutableMap.of(); } final HttpClientContext context = ApacheUtils.newClientContext( httpClientSettings, attributes); final HttpResponse httpResponse = new HttpResponse(request, null, context); httpResponse.setContent(bodyStream); httpResponse.setStatusCode(httpClientResponse.statusCode()); httpResponse.setStatusText(httpClientResponse.statusMessage()); for (final Map.Entry<String, String> entry : httpClientResponse.headers().entries()) { httpResponse.addHeader(entry.getKey(), entry.getValue()); } if (LOGGER.isDebugEnabled()) { LOGGER.debug( String.format( "Received response with status %d for request %s", httpClientResponse.statusCode(), request.getHeaders().get(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID))); } if (httpClientResponse.statusCode() / 100 == 2) { futureResponse.complete(responseHandler.handle(httpResponse).getResult()); } else { final HttpResponseHandler<AmazonServiceException> errorResponseHandler = PROTOCOL_FACTORY .createErrorResponseHandler(new JsonErrorResponseMetadata()); futureResponse.fail(errorResponseHandler.handle(httpResponse)); } } catch (final Exception e) { futureResponse.fail(e); } }
Conclusion
The parallel programming model in the Java language and core libraries will continue to evolve. Hopefully this work will eventually provide the scaffolding necessary to unify the existing frameworks and allow them to seamlessly co-exist in one application. Following such a unification, the work described in this post would be portable to any application using any asynchronous framework. Removing this cost multiplier would accelerate the development of asynchronous libraries and SDKs. But in the meantime, I hope this post helps you to create your own asynchronous service clients for Amazon Web Services.