Skip to main content
Qualtrics Home page

Backend Development

Vert.x DynamoDb Client

Introduction

This post explores writing a Vert.x DynamoDb Client to leverage asynchronous and non-blocking I/O and programming style. The Java language and core library support for asynchronous programming models have traditionally been limited. Although asynchronous non-blocking I/O support was introduced in Java 4 and most recently updated in Java 7, it has taken several generations for the improvements to gain widespread adoption. Java 8 recently introduced other constructs to aid this style of programming such as the promise-esque CompletionStage, but there were still gaps in support for asynchronous models. The software engineering community has stepped up to fill these gaps and expand functionality with several open source offerings. Such options include the frameworks RxJava, Play, Akka and Vert.x, to name a few.

Leveraging these frameworks often requires building more than you would anticipate. Existing service clients and SDKs are often built with blocking operations or their own thread pools. One prominent example of this is the Amazon AWS SDK. It for example provides both a synchronous and asynchronous interface using an internal thread pool executor for DynamoDB (see: Asynchronous Programming with the AWS SDK for Java).

The problem with using additional thread pools is that it somewhat defeats the efficiency gains from building around an asynchronous framework. In some cases, such as with SQL databases, it is a trade-off against the complexity of building an asynchronous client. However, in others such as the AWS SDK, the lack of a true asynchronous implementation is more likely due to the lack of a single way to implement it than anything else -- or put another way, the lack of language and core library support for the necessary concepts.

Consequently, this situation frequently necessitates writing your own libraries to integrate the service or resource with your framework of choice. In this post I will describe how we tackled this problem to integrate Amazon’s DynamoDb with Vert.x.

Interface

The Amazon Web Service SDK provides interfaces for both a blocking (see: AmazonDynamoDB.java) and non-blocking (see: AmazonDynamoDBAsync.java) interactions with DynamoDb. The non-blocking interface defines each operation in two ways:

java.util.concurrent.Future<GetItemResult> getItemAsync(
           GetItemRequest getItemRequest);

java.util.concurrent.Future<GetItemResult> getItemAsync(
           GetItemRequest getItemRequest,
           AsyncHandler<GetItemRequest, GetItemResult> asyncHandler);

In order to provide a Vert.x compatible client the type of Future needs to be changed from java.util.concurrent.Future to io.vertx.core.Future. Then we remove the second variant since the Vert.x Future includes support for handlers.

Implementation

The existing asynchronous implementation (see: AmazonDynamoDBAsyncClient.java) uses an ExecutorService for execution and Apache Http Client for dispatch (see: AmazonHttpClient.java). Instead our implementation uses the Vert.x framework for execution and the Vert.x Http Client for dispatch. However, we continue to leverage the same request and result types as well as the marshalling and unmarshalling code included in the AWS SDK. For example:

@Override
public Future<ListTablesResult> listTablesAsync(final ListTablesRequest listTablesRequest) {
    final Request<ListTablesRequest> request = new ListTablesRequestMarshaller(PROTOCOL_FACTORY)
            .marshall(listTablesRequest);

    final HttpResponseHandler<AmazonWebServiceResponse<ListTablesResult>> responseHandler = PROTOCOL_FACTORY.createResponseHandler(
            new JsonOperationMetadata()
                    .withPayloadJson(true)
                    .withHasStreamingSuccessResponse(false),
            new ListTablesResultJsonUnmarshaller());
    
    return invoke(request, responseHandler);
}

Invocation

Up to this point the changes are mechanical and the code is standard boilerplate, very similar to what is already in the AWS SDK. The implementation of invoke does all of the heavy lifting: request signing, header generation, path and parameter generation and finally request dispatch and response handling. In the AWS SDK the equivalent code is independent of the AWS service. Similarly, this code should also work with most AWS services without major modifications.

Signing

To leverage as much code from the AWS SDK as possible we wait until just before dispatch to map the AWS SDK request object to Vert.x. This simplifies several operations such as signing:

final AWSCredentials awsCredentials = awsCredentialsProvider.getCredentials();
signer.sign(request, awsCredentials);

Header Mapping

However, some operations are buried deeper in the SDK and must be reproduced. Headers are obtained from three sources: 1) Request; 2) AmazonWebServiceRequest; 3) ClientConfiguration. Additional headers are explicitly added to all requests such as amz-sdk-invocation-id and User-Agent.

/* package private */ static void addHeadersToRequest(
        final Request<?> request,
        final AmazonWebServiceRequest originalRequest,
        final ClientConfiguration clientConfiguration) {

    // Set the request id
    request.addHeader(
            AmazonHttpClient.HEADER_SDK_TRANSACTION_ID,
            UUID.randomUUID().toString());

    // Set user agent
    final RequestClientOptions customClientOptions = originalRequest.getRequestClientOptions();
    if (customClientOptions != null) {
        request.addHeader(
                AmazonHttpClient.HEADER_USER_AGENT,
                RuntimeHttpUtils.getUserAgent(
                        clientConfiguration,
                        customClientOptions.getClientMarker(
                                RequestClientOptions.Marker.USER_AGENT)));
    } else {
        request.addHeader(
                AmazonHttpClient.HEADER_USER_AGENT,
                RuntimeHttpUtils.getUserAgent(
                        clientConfiguration,
                        null));
    }

    // Add headers
    request.getHeaders().putAll(clientConfiguration.getHeaders());
    final Map<String, String> customHeaders = originalRequest.getCustomRequestHeaders();
    if (customHeaders != null) {
        request.getHeaders().putAll(customHeaders);
    }
}

URI Construction

One difference in the request is that Apache defines the endpoint and path separately from the query parameters, whereas in Vert.x the endpoint is separate while the path and parameters are combined. Our strategy is to construct a java.net.URI instance from the AWS SDK request and then map that the Vert.x request. This code again leverages existing AWS SDK utilities to help construct the path, merge parameters, and encode parameters.

/* package private */ static URI createRequestUri(
        final Request<?> request,
        final AmazonWebServiceRequest originalRequest) {

    // Build the request uri escaping any double slashes from the resource path
    final StringBuilder uriString = new StringBuilder(
            SdkHttpUtils.appendUri(
                    request.getEndpoint().toString(),
                    request.getResourcePath(),
                    true));

    // Add query parameters
    final Map<String, List<String>> customQueryParameters = originalRequest.getCustomQueryParameters();
    if (customQueryParameters != null) {
        for (final Map.Entry<String, List<String>> entry : customQueryParameters.entrySet()) {
            request.getParameters().put(
                    entry.getKey(),
                    CollectionUtils.mergeLists(
                            request.getParameters().get(entry.getKey()),
                            entry.getValue()
                    ));
        }
    }
    final String encodedQueryParameters = SdkHttpUtils.encodeParameters(request);
    if (encodedQueryParameters != null) {
        uriString.append("?").append(encodedQueryParameters);
    }

    return URI.create(uriString.toString());
}

Request Dispatch

Overall, building the dispatch process was surprisingly simple. The only caveat was the mapping of the payload. Vert.x HTTP requests accept a byte[], String or io.vertx.core.buffer.Buffer while the AWS SDK provides a java.io.InputStream. Ideally, the stream would be wrapped in a Buffer implementation; however, for expediency we consume the stream into a String and wrap that in a Buffer. In our next release, we plan to avoid this unnecessary materialization.

// Build the Vert.x http client request
final HttpMethod httpMethod = translateToVertxHttpMethod(request.getHttpMethod());
final HttpClientRequest httpClientRequest = httpClient.request(
        httpMethod,
        uri.getPort(),
        uri.getHost(),
        uri.getPath() + uri.getQuery());
for (final Map.Entry<String, String> header : request.getHeaders().entrySet()) {
    httpClientRequest.putHeader(header.getKey(), header.getValue());
}

// Setup the handler and response future
final Future<U> futureResponse = Future.future();
httpClientRequest.handler(
        httpClientResponse -> httpClientResponse.bodyHandler(
                new ResponseBodyHandler<T, U>(
                        request,
                        httpClientResponse,
                        futureResponse,
                        responseHandler,
                        httpClientSettings)));

// End the request
if (request.getContent() != null) {
    try {
        // TODO: Avoid materializing the InputStream
        httpClientRequest.end(
                CharStreams.toString(
                        new InputStreamReader(
                                request.getContent(),
                                Charsets.UTF_8)));
    } catch (final IOException e) {
        futureResponse.fail(e);
    }
} else {
    httpClientRequest.end();
}

// Return the future response
return futureResponse;

Response Handling

The response handling relies on existing functionality in the AWS SDK for several features: 1) Checksum validation; 2) Response unmarshalling; 3) Error unmarshalling. One notable part of this code is that the checksum is computed as a stream around the body and consumed via an attribute on the context. If this attribute is not present on the context but the response contains a checksum, the request will fail. The result is an unfortunate coupling to the HttpClientContext class in the Apache Http Client. This was part of the price we pay for reusing parts of the AWS SDK in ways its authors likely did not intend.

@Override
public void handle(@Nullable final Buffer bodyBuffer) {
    try {
        final Map<String, Object> attributes;
        final InputStream bodyStream;
        if (bodyBuffer != null && bodyBuffer.length() > 0) {
            bodyStream = new CRC32ChecksumCalculatingInputStream(
                    new ByteArrayInputStream(
                            bodyBuffer.getBytes()));
            attributes = ImmutableMap.of(CRC32ChecksumCalculatingInputStream.class.getName(), bodyStream);
        } else {
            bodyStream = null;
            attributes = ImmutableMap.of();
        }

        final HttpClientContext context = ApacheUtils.newClientContext(
                httpClientSettings,
                attributes);

        final HttpResponse httpResponse = new HttpResponse(request, null, context);
        httpResponse.setContent(bodyStream);
        httpResponse.setStatusCode(httpClientResponse.statusCode());
        httpResponse.setStatusText(httpClientResponse.statusMessage());
        for (final Map.Entry<String, String> entry : httpClientResponse.headers().entries()) {
            httpResponse.addHeader(entry.getKey(), entry.getValue());
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(
                    String.format(
                            "Received response with status %d for request %s",
                            httpClientResponse.statusCode(),
                            request.getHeaders().get(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID)));
        }

        if (httpClientResponse.statusCode() / 100 == 2) {
            futureResponse.complete(responseHandler.handle(httpResponse).getResult());
        } else {
            final HttpResponseHandler<AmazonServiceException> errorResponseHandler = PROTOCOL_FACTORY
                    .createErrorResponseHandler(new JsonErrorResponseMetadata());
            futureResponse.fail(errorResponseHandler.handle(httpResponse));
        }
    } catch (final Exception e) {
        futureResponse.fail(e);
    }
}

Conclusion

The parallel programming model in the Java language and core libraries will continue to evolve. Hopefully this work will eventually provide the scaffolding necessary to unify the existing frameworks and allow them to seamlessly co-exist in one application. Following such a unification, the work described in this post would be portable to any application using any asynchronous framework. Removing this cost multiplier would accelerate the development of asynchronous libraries and SDKs. But in the meantime, I hope this post helps you to create your own asynchronous service clients for Amazon Web Services.

Related Articles