Elasticsearch and Kibana on AWS

Today I am going to show you how to design and implement a cloud based, real-time, log storage and analytics system. Why? Because a good, scalable log analytics system can identify bugs / latency problems for your platform and also provide a foundation for personalization features in your apps. Also, the architecture is designed with multiple uses in mind so you can also run other search services on top of same general system. For example, these are a few questions you will be able to answer with a real-time log analytics framework:

  • How many / which requests generated a 404 error?
  • How many / which requests generated a 500 error?
  • How many / which requests took more than 2 seconds to complete?
  • Which requests sent the most bytes back to the users?
  • What requests does the average user make when on the system? now we can replicate those request in a script and test for 10K users, 50K users, 4M users etc. and see how the system responds.
  • Recommendations: Users who made these requests also made these requests (this can be anything like viewing a book, favorited a recipe, bookmarking an equation etc.)

Below is a diagram of a production system that I built and have been running.

diagram

Elasticsearch

We use an Elasticsearch as our distributed search service and Kibana as our User Interface to Elasticsearch for ad hoc analysis of the logs and saved analysis in the form of dashboards. The first thing you will want to do is set up your Elasticsearch service. We use Amazon’s ES Service to provision and manage our resources. Go ahead and create a cluster. This will also install the Kibana plugin on your cluster. After your cluster is up and running you will have something called a search domain. Your search domain has 3 very important pieces of information:

  • Endpoint: Where you make HTTP request to the search cluster search-yourSearchDomainName-somerandomhash.us-west-2.es.amazonaws.com
  • Kibana Url: Where you point your browser for Kibana search-yourSearchDomainName-somerandomhash.us-west-2.es.amazonaws.com/_plugin/kibana
  • ARN: The ID of your search domain for use with other services like IAM arn:aws:es:us-west-2:(YourAWSId):domain/yourSearchDomainName

When your search cluster is first setup, modify your access policy and make it wide open so you can test your configuration. You cannot just hit the Elasticsearch endpoint with curl from a machine in your vpc and have it work (like RDS). If the search cluster is not wide open you will have to either sign your HTTP request (which I will show you how to do) or proxy through an authorized IP address (which I will also show you how to do). But first, let’s just test the basics so open it up. Once you have opened up access to everyone go ahead and use curl to test your basic Elasticsearch setup and click on the Kibana link to make sure it is working. Play around with both adding some test entries into your cluster and querying them with Kibana.

Nginx Reverse Proxy

The next step is to set up a reverse HTTP proxy so we can give secure web access to our protected Kibana resource via IAM. First let us create the reverse proxy, test it and then I'll show you how to secure it via IAM. We will use Nginx as a reverse proxy. You will need a running EC2 instance using any flavour of Linux you prefer. I use Amazon Linux. The details of installation and the exact location of the config files will depend on the Linux distribution you choose. However, nginx installation and config file locations are straightforward on all distributions.

Once you have nginx installed, open up port 443 on your EC2 instance using the AWS console, and make sure you have ssl certificates. You could do this over port 80 without ssl certs however I highly discourage this approach for a production system. You need two nginx config files: nginx.conf and kibana.conf mine are located at /etc/nginx

nginx.conf
                        
                            #user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}
    include /etc/nginx/conf.d/*.conf;
}
                        
                    
kibana.conf
                        
server {
    listen 443;

    ssl	on;
    ssl_certificate /etc/ssl/certs/[YOUR_CERT].crt;
    ssl_certificate_key	/etc/ssl/certs/[YOUR_CERT_KEY].key;

    server_name myproxy.example.com;

    location / {
        proxy_pass https://search-yourSearchDomainName-somerandomhash.us-west-2.es.amazonaws.com;
        proxy_http_version 1.1;

        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Authorization "";
        proxy_hide_header Authorization;
       	auth_basic "Username and Password are required";
       	auth_basic_user_file /etc/nginx/.htpasswd;
    }
}
                        
                    

Note that kibana.conf references your server name "myproxy.example.com" . That is the address you will use to access Kibana. Here you can use an Elastic IP address and route 53 to give a meaningful name to your instance. You will need this IP address to secure via IAM in the step below. Also, if your instance fails you can just spin up another EC2 instance and re-assign the elastic IP without much trouble. Also, notice that the last two lines enable basic authentication. This will protect your proxy with a username and password. To create a username and password use the htpasswd tool

You should now have a secure proxy! It is accessible only via HTTPS and is secured via Basic Auth. Next we need to restrict access to our search cluster and Kibana.

Identity and Access Management (IAM)

We use IAM to restrict access to our search cluster to signed requests and specific IP addresses. So we create two IAM policies with the declaration below. We have the IP restriction in place so that only our reverse proxy can interface with Kibana and the cluster. Then we have a user policy so that we can make HTTP calls to our cluster via code if we sign our request appropriately (I will show you how to sign requests in Java). Attach this IAM policy to your search cluster by using the “Modify access policy” action on the AWS console. It takes a while for the policy to update. Once it is updated, make sure your Kibana plugin is still accessible via your reverse proxy. Also, you will no longer be able to make a curl HTTP call to your search cluster from anywhere.

                        
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::[YOUR AWS ACCOUNT ID]:user/[YOUR IAM USER]"
      },
      "Action": "es:*",
      "Resource": "[YOUR SEARCH CLUSTER ARN]"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "[YOUR SEARCH CLUSTERN ARN]",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": "[YOUR REVERSE PROXY IP ADDRESS]"
        }
      }
    }
  ]
}
                        
                    

To make HTTP calls to your search cluster now, you will need to sign your request with the credentials of the IAM user you used above. Below is an example of how to do it in Java. I have five Java files below. Two are from the Amazon Java SDK and the rest are convenience classes to sign requests and access the search cluster. Feel free to use as you see fit, the code I wrote is under an Apache License for com.sakkaris, I will have a maven repo one of these days. Just add all these files into a package and add the “package” statement to the top of the code. For example, I have all these files in a “utils/aws” package. The files are:

  1. AmazoneOptions: a class I wrote to build HTTP options for the requests
  2. AmazonSignedRequestClient: a class I wrote to make signed HTTP calls to AWS
  3. AmazonElasticsearchRequestClient: a class I wrote to make AWS Elaticsearch calls
  4. HttpRequestFactory: copy this file from the AWS Java SDK into your package because it is not a public class in the SDK. Apache License.
  5. RepeatableInputStreamRequestEntity: copy this file from the AWS Java SDK into your package because it is not a public class in the SDK. Apache License.

There are two dependencies for this code: Guava, and Amazon Java SDK

AmazonOptions.java
                        
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.metrics.RequestMetricCollector;

/**
 * @author Perry Sakkaris
 */
public class AmazonOptions {

    private final String serviceName;
    private final String region;
    private final AWSCredentials credentials;
    private final ClientConfiguration clientConfiguration;
    private final RequestMetricCollector requestMetricCollector;

    private AmazonOptions(Builder builder) {
        this.serviceName = builder.serviceName;
        this.region = builder.region;
        this.credentials = builder.credentials;
        this.requestMetricCollector = builder.requestMetricCollector;
        // use a default for clientConfiguration
        this.clientConfiguration = (builder.clientConfiguration == null) ?
                new ClientConfiguration() : builder.clientConfiguration;
    }

    public static Builder builder() {
        return new Builder();
    }

    public Builder toBuilder() {
        Builder builder = new Builder();
        builder.serviceName = serviceName;
        builder.region = region;
        builder.credentials = credentials;
        builder.clientConfiguration = clientConfiguration;
        builder.requestMetricCollector = requestMetricCollector;

        return builder;
    }

    public String getServiceName() {
        return serviceName;
    }

    public String getRegion() {
        return region;
    }

    public AWSCredentials getCredentials() {
        return credentials;
    }

    public ClientConfiguration getClientConfiguration() {
        return clientConfiguration;
    }

    public RequestMetricCollector getRequestMetricCollector() {
        return requestMetricCollector;
    }

    public static class Builder {
        private String serviceName;
        private String region;
        private AWSCredentials credentials;
        private ClientConfiguration clientConfiguration;
        private RequestMetricCollector requestMetricCollector;

        private Builder() {}

        public Builder serviceName(String serviceName) {
            this.serviceName = serviceName;
            return this;
        }

        public Builder region(String region) {
            this.region = region;
            return this;
        }

        public Builder credentials(AWSCredentials credentials) {
            this.credentials = credentials;
            return this;
        }

        public Builder clientConfiguration(ClientConfiguration clientConfiguration) {
            this.clientConfiguration = clientConfiguration;
            return this;
        }

        public Builder requestMetricCollector(RequestMetricCollector requestMetricCollector) {
            this.requestMetricCollector = requestMetricCollector;
            return this;
        }

        public AmazonOptions build() {
            return new AmazonOptions(this);
        }
    }
}
                        
                    
AmazonSignedRequestClient.java
                        
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.http.HttpMethodName;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.HttpClients;

import java.io.IOException;
import java.net.URI;

/**
 * @author Perry Sakkaris
 */
public class AmazonSignedRequestClient extends AmazonWebServiceClient {
    private final String region;
    private final String serviceName;
    private final AWSCredentials credentials;
    private final HttpClient httpClient;
    private final HttpRequestFactory httpRequestFactory;


    public AmazonSignedRequestClient(AmazonOptions amazonOptions) {
        super(amazonOptions.getClientConfiguration(), amazonOptions.getRequestMetricCollector());

        Preconditions.checkNotNull(amazonOptions.getRegion());
        Preconditions.checkNotNull(amazonOptions.getServiceName());
        Preconditions.checkNotNull(amazonOptions.getCredentials());

        this.region = amazonOptions.getRegion();
        this.serviceName = amazonOptions.getServiceName();
        this.credentials = amazonOptions.getCredentials();

        httpRequestFactory = new HttpRequestFactory();
        httpClient = HttpClients.createDefault();
    }

    public String get(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.GET, relativePath, content);
    }

    public String post(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.POST, relativePath, content);
    }

    public String put(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.PUT, relativePath, content);
    }

    public String delete(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.DELETE, relativePath, content);
    }

    public String patch(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.PATCH, relativePath, content);
    }

    public String head(URI endpoint, String relativePath, String content) {
        return makeRequest(endpoint, HttpMethodName.HEAD, relativePath, content);
    }

    public String makeRequest(URI endpoint, HttpMethodName methodName, String relativePath, String content)  {
        Preconditions.checkNotNull(relativePath);
        Preconditions.checkNotNull(endpoint);

        DefaultRequest<AmazonWebServiceRequest> defaultRequest = new DefaultRequest<>(serviceName);
        defaultRequest.setEndpoint(endpoint);
        defaultRequest.setHttpMethod(methodName);
        defaultRequest.setResourcePath(relativePath);
        if(!StringUtils.isBlank(content)) {
            defaultRequest.setContent(IOUtils.toInputStream(content));
        }

        AWS4Signer signer = new AWS4Signer();
        signer.setServiceName(serviceName);
        signer.setRegionName(region);
        signer.sign(defaultRequest, credentials);

        try {
            HttpRequestBase request = httpRequestFactory.createHttpRequest(defaultRequest, clientConfiguration,
                    createExecutionContext(defaultRequest));

            return IOUtils.toString(httpClient.execute(request).getEntity().getContent());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
                        
                    
AmazonElasticsearchRequestClient.java
                        
import com.google.common.base.Preconditions;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.logging.Logger;

/**
 * @author Perry Sakkaris
 */
public final class AmazonElasticsearchRequestClient {
    private static final Logger LOG = Logger.getLogger(AmazonElasticsearchRequestClient.class.getName());
    private static final String SERVICE_NAME = "es";

    private final AmazonSignedRequestClient signedRequestClient;
    private final URI searchDomainUri;

    public AmazonElasticsearchRequestClient(String searchDomain, AmazonOptions amazonOptions) {
        Preconditions.checkNotNull(searchDomain);
        Preconditions.checkNotNull(amazonOptions);

        // make sure to set the serviceName to the correct value
        AmazonOptions searchOptions = amazonOptions.toBuilder().serviceName(SERVICE_NAME).build();

        try {
            searchDomainUri = new URI(searchDomain);
        } catch (URISyntaxException se) {
            throw new RuntimeException(se);
        }
        signedRequestClient = new AmazonSignedRequestClient(searchOptions);
    }

    public String get(String relativePath, String content) {
        return signedRequestClient.get(searchDomainUri, relativePath, content);
    }

    public String get(String relativePath) {
        return signedRequestClient.get(searchDomainUri, relativePath, null);
    }

    public String post(String relativePath, String content) {
        return signedRequestClient.post(searchDomainUri, relativePath, content);
    }

    public String post(String relativePath) {
        return signedRequestClient.post(searchDomainUri, relativePath, null);
    }

    public String put(String relativePath, String content) {
        return signedRequestClient.put(searchDomainUri, relativePath, content);
    }

    public String put(String relativePath) {
        return signedRequestClient.put(searchDomainUri, relativePath, null);
    }

    public String delete(String relativePath, String content) {
        return signedRequestClient.delete(searchDomainUri, relativePath, content);
    }

    public String delete(String relativePath) {
        return signedRequestClient.delete(searchDomainUri, relativePath, null);
    }

    public String patch(String relativePath, String content) {
        return signedRequestClient.patch(searchDomainUri, relativePath, content);
    }

    public String patch(String relativePath) {
        return signedRequestClient.patch(searchDomainUri, relativePath, null);
    }

    public String head(String relativePath, String content) {
        return signedRequestClient.head(searchDomainUri, relativePath, content);
    }

    public String head(String relativePath) {
        return signedRequestClient.head(searchDomainUri, relativePath, null);
    }

}
                        
                    
HttpRequestFactory.java
                        
/*
 * Copyright 2011-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Request;
import com.amazonaws.http.ExecutionContext;
import com.amazonaws.http.HttpMethodName;
import com.amazonaws.util.FakeIOException;
import com.amazonaws.util.HttpUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.*;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.params.CoreProtocolPNames;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.util.Map.Entry;

/** Responsible for creating Apache HttpClient 4 request objects. */
class HttpRequestFactory {

    private static final String DEFAULT_ENCODING = "UTF-8";

    /**
     * Creates an HttpClient method object based on the specified request and
     * populates any parameters, headers, etc. from the original request.
     *
     * @param request
     *            The request to convert to an HttpClient method object.
     * @param context
     *            The execution context of the HTTP method to be executed
     *
     * @return The converted HttpClient method object with any parameters,
     *         headers, etc. from the original request set.
     * @throws FakeIOException only for test simulation
     */
    HttpRequestBase createHttpRequest(Request request,
                                      ClientConfiguration clientConfiguration, ExecutionContext context)
            throws FakeIOException {
        URI endpoint = request.getEndpoint();

        /*
         * HttpClient cannot handle url in pattern of "http://host//path", so we
         * have to escape the double-slash between endpoint and resource-path
         * into "/%2F"
         */
        String uri = HttpUtils.appendUri(endpoint.toString(), request.getResourcePath(), true);
        String encodedParams = HttpUtils.encodeParameters(request);

        /*
         * For all non-POST requests, and any POST requests that already have a
         * payload, we put the encoded params directly in the URI, otherwise,
         * we'll put them in the POST request's payload.
         */
        boolean requestHasNoPayload = request.getContent() != null;
        boolean requestIsPost = request.getHttpMethod() == HttpMethodName.POST;
        boolean putParamsInUri = !requestIsPost || requestHasNoPayload;
        if (encodedParams != null && putParamsInUri) {
            uri += "?" + encodedParams;
        }

        HttpRequestBase httpRequest;
        if (request.getHttpMethod() == HttpMethodName.POST) {
            HttpPost postMethod = new HttpPost(uri);

            /*
             * If there isn't any payload content to include in this request,
             * then try to include the POST parameters in the query body,
             * otherwise, just use the query string. For all AWS Query services,
             * the best behavior is putting the params in the request body for
             * POST requests, but we can't do that for S3.
             */
            if (request.getContent() == null && encodedParams != null) {
                postMethod.setEntity(newStringEntity(encodedParams));
            } else {
                postMethod.setEntity(new RepeatableInputStreamRequestEntity(request));
            }
            httpRequest = postMethod;
        } else if (request.getHttpMethod() == HttpMethodName.PUT) {
            HttpPut putMethod = new HttpPut(uri);
            httpRequest = putMethod;

            /*
             * Enable 100-continue support for PUT operations, since this is
             * where we're potentially uploading large amounts of data and want
             * to find out as early as possible if an operation will fail. We
             * don't want to do this for all operations since it will cause
             * extra latency in the network interaction.
             */
            putMethod.getParams().setParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);

            /*
             * We should never reuse the entity of the previous request, since
             * reading from the buffered entity will bypass reading from the
             * original request content. And if the content contains InputStream
             * wrappers that were added for validation-purpose (e.g.
             * Md5DigestCalculationInputStream), these wrappers would never be
             * read and updated again after AmazonHttpClient resets it in
             * preparation for the retry. Eventually, these wrappers would
             * return incorrect validation result.
             */
            if (request.getContent() != null) {
                HttpEntity entity = new RepeatableInputStreamRequestEntity(request);
                if (request.getHeaders().get("Content-Length") == null) {
                    entity = newBufferedHttpEntity(entity);
                }
                putMethod.setEntity(entity);
            }
        } else if (request.getHttpMethod() == HttpMethodName.PATCH) {
            HttpPatch patchMethod = new HttpPatch(uri);
            httpRequest = patchMethod;

            /*
             * We should never reuse the entity of the previous request, since
             * reading from the buffered entity will bypass reading from the
             * original request content. And if the content contains InputStream
             * wrappers that were added for validation-purpose (e.g.
             * Md5DigestCalculationInputStream), these wrappers would never be
             * read and updated again after AmazonHttpClient resets it in
             * preparation for the retry. Eventually, these wrappers would
             * return incorrect validation result.
             */
            if (request.getContent() != null) {
                HttpEntity entity = new RepeatableInputStreamRequestEntity(request);
                if (request.getHeaders().get("Content-Length") == null) {
                    entity = newBufferedHttpEntity(entity);
                }
                patchMethod.setEntity(entity);
            }
        } else if (request.getHttpMethod() == HttpMethodName.GET) {
            httpRequest = new HttpGet(uri);
        } else if (request.getHttpMethod() == HttpMethodName.DELETE) {
            httpRequest = new HttpDelete(uri);
        } else if (request.getHttpMethod() == HttpMethodName.HEAD) {
            httpRequest = new HttpHead(uri);
        } else {
            throw new AmazonClientException("Unknown HTTP method name: " + request.getHttpMethod());
        }

        configureHeaders(httpRequest, request, context, clientConfiguration);

        return httpRequest;
    }

    /** Configures the headers in the specified Apache HTTP request. */
    private void configureHeaders(HttpRequestBase httpRequest, Request request, ExecutionContext context, ClientConfiguration clientConfiguration) {
        /*
         * Apache HttpClient omits the port number in the Host header (even if
         * we explicitly specify it) if it's the default port for the protocol
         * in use. To ensure that we use the same Host header in the request and
         * in the calculated string to sign (even if Apache HttpClient changed
         * and started honoring our explicit host with endpoint), we follow this
         * same behavior here and in the QueryString signer.
         */
        URI endpoint = request.getEndpoint();
        String hostHeader = endpoint.getHost();
        if (HttpUtils.isUsingNonDefaultPort(endpoint)) {
            hostHeader += ":" + endpoint.getPort();
        }
        httpRequest.addHeader("Host", hostHeader);

        // Copy over any other headers already in our request
        for (Entry<String, String> entry : request.getHeaders().entrySet()) {
            /*
             * HttpClient4 fills in the Content-Length header and complains if
             * it's already present, so we skip it here. We also skip the Host
             * header to avoid sending it twice, which will interfere with some
             * signing schemes.
             */
            if (entry.getKey().equalsIgnoreCase("Content-Length") || entry.getKey().equalsIgnoreCase("Host")) continue;

            httpRequest.addHeader(entry.getKey(), entry.getValue());
        }

        /* Set content type and encoding */
        if (httpRequest.getHeaders("Content-Type") == null || httpRequest.getHeaders("Content-Type").length == 0) {
            httpRequest.addHeader("Content-Type",
                    "application/x-www-form-urlencoded; " +
                            "charset=" + DEFAULT_ENCODING.toLowerCase());
        }

        // Override the user agent string specified in the client params if the context requires it
        if (context != null && context.getContextUserAgent() != null) {
            httpRequest.addHeader("User-Agent", createUserAgentString(clientConfiguration, context.getContextUserAgent()));
        }
    }

    /** Appends the given user-agent string to the client's existing one and returns it. */
    private String createUserAgentString(ClientConfiguration clientConfiguration, String contextUserAgent) {
        if (clientConfiguration.getUserAgent().contains(contextUserAgent)) {
            return clientConfiguration.getUserAgent();
        } else {
            return clientConfiguration.getUserAgent() + " " + contextUserAgent;
        }
    }

    /**
     * Utility function for creating a new StringEntity and wrapping any errors
     * as an AmazonClientException.
     *
     * @param s
     *            The string contents of the returned HTTP entity.
     *
     * @return A new StringEntity with the specified contents.
     */
    private HttpEntity newStringEntity(String s) {
        try {
            return new StringEntity(s);
        } catch (UnsupportedEncodingException e) {
            throw new AmazonClientException("Unable to create HTTP entity: " + e.getMessage(), e);
        }
    }

    /**
     * Utility function for creating a new BufferedEntity and wrapping any errors
     * as an AmazonClientException.
     *
     * @param entity
     *            The HTTP entity to wrap with a buffered HTTP entity.
     *
     * @return A new BufferedHttpEntity wrapping the specified entity.
     * @throws FakeIOException only for test simulation
     */
    private HttpEntity newBufferedHttpEntity(HttpEntity entity) throws FakeIOException {
        try {
            return new BufferedHttpEntity(entity);
        } catch(FakeIOException e) {
            throw e;
        } catch (IOException e) {
            throw new AmazonClientException("Unable to create HTTP entity: " + e.getMessage(), e);
        }
    }
}
                        
                    
RepeatableInputStreamRequestEntity.java
                        
import com.amazonaws.Request;
import com.amazonaws.http.AmazonHttpClient;
import com.amazonaws.metrics.MetricInputStreamEntity;
import com.amazonaws.metrics.ServiceMetricType;
import com.amazonaws.metrics.ThroughputMetricType;
import com.amazonaws.metrics.internal.ServiceMetricTypeGuesser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.InputStreamEntity;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * Custom implementation of {RequestEntity} that delegates to an
 * {InputStreamRequestEntity}, with the one notable difference, that if
 * the underlying InputStream supports being reset, this RequestEntity will
 * report that it is repeatable and will reset the stream on all subsequent
 * attempts to write out the request.
 */
class RepeatableInputStreamRequestEntity extends BasicHttpEntity {

    /** True if the request entity hasn't been written out yet */
    private boolean firstAttempt = true;

    /** The underlying InputStreamEntity being delegated to */
    private InputStreamEntity inputStreamRequestEntity;

    /** The InputStream containing the content to write out */
    private InputStream content;

    /** Shared logger for more debugging information */
    private static final Log log = LogFactory.getLog(AmazonHttpClient.class);

    /**
     * Record the original exception if we do attempt a retry, so that if the
     * retry fails, we can report the original exception. Otherwise, we're most
     * likely masking the real exception with an error about not being able to
     * reset far enough back in the input stream.
     */
    private IOException originalException;


    /**
     * Creates a new RepeatableInputStreamRequestEntity using the information
     * from the specified request. If the input stream containing the request's
     * contents is repeatable, then this RequestEntity will report as being
     * repeatable.
     *
     * @param request
     *            The details of the request being written out (content type,
     *            content length, and content).
     */
    RepeatableInputStreamRequestEntity(final Request request) {
        setChunked(false);

        /*
         * If we don't specify a content length when we instantiate our
         * InputStreamRequestEntity, then HttpClient will attempt to
         * buffer the entire stream contents into memory to determine
         * the content length.
         *
         * TODO: It'd be nice to have easier access to content length and
         *       content type from the request, instead of having to look
         *       directly into the headers.
         */
        long contentLength = -1;
        try {
            String contentLengthString = request.getHeaders().get("Content-Length");
            if (contentLengthString != null) {
                contentLength = Long.parseLong(contentLengthString);
            }
        } catch (NumberFormatException nfe) {
            log.warn("Unable to parse content length from request.  " +
                    "Buffering contents in memory.");
        }

        String contentType = request.getHeaders().get("Content-Type");
        ThroughputMetricType type = ServiceMetricTypeGuesser
                .guessThroughputMetricType(request,
                        ServiceMetricType.UPLOAD_THROUGHPUT_NAME_SUFFIX,
                        ServiceMetricType.UPLOAD_BYTE_COUNT_NAME_SUFFIX);
        if (type == null) {
            inputStreamRequestEntity =
                    new InputStreamEntity(request.getContent(), contentLength);
        } else {
            inputStreamRequestEntity =
                    new MetricInputStreamEntity(type, request.getContent(), contentLength);
        }
        inputStreamRequestEntity.setContentType(contentType);
        content = request.getContent();

        setContent(content);
        setContentType(contentType);
        setContentLength(contentLength);
    }

    @Override
    public boolean isChunked() {
        return false;
    }

    /**
     * Returns true if the underlying InputStream supports marking/reseting or
     * if the underlying InputStreamRequestEntity is repeatable (i.e. its
     * content length has been set to
     * {@link InputStreamRequestEntity#CONTENT_LENGTH_AUTO} and therefore its
     * entire contents will be buffered in memory and can be repeated).
     *
     * @see org.apache.commons.httpclient.methods.RequestEntity#isRepeatable()
     */
    @Override
    public boolean isRepeatable() {
        return content.markSupported() || inputStreamRequestEntity.isRepeatable();
    }

    /**
     * Resets the underlying InputStream if this isn't the first attempt to
     * write out the request, otherwise simply delegates to
     * InputStreamRequestEntity to write out the data.
     * <p>
     * If an error is encountered the first time we try to write the request
     * entity, we remember the original exception, and report that as the root
     * cause if we continue to encounter errors, rather than masking the
     * original error.
     *
     * @see org.apache.commons.httpclient.methods.RequestEntity#writeRequest(OutputStream)
     */
    @Override
    public void writeTo(OutputStream output) throws IOException {
        try {
            if (!firstAttempt && isRepeatable()) content.reset();

            firstAttempt = false;
            inputStreamRequestEntity.writeTo(output);
        } catch (IOException ioe) {
            if (originalException == null) originalException = ioe;
            throw originalException;
        }
    }

}
                        
                    

Below is an example java main method to make requests to your search cluster. Just replace the searchEndpoint below with your cluster's endpoint. You must have your credentials configured on your computer for this to work. It is pretty straightforward, the details can be found here: AWS Java Credentials

                        
public static void main(String ... args) throws Exception {
        AWSCredentials credentials = new DefaultAWSCredentialsProviderChain().getCredentials();
        String searchDomain = "https://YOURS_SEARCH_ENDPOINT";

        AmazonOptions options = AmazonOptions.builder()
                .credentials(credentials)
                .region(Regions.US_WEST_2.getName())
                .build();

        AmazonElasticsearchRequestClient elasticsearchClient = new AmazonElasticsearchRequestClient(searchDomain, options);

        System.out.println(elasticsearchClient.get("/access-logs"));
}
                        
                    

Load Balancer / S3 / Glacier

Ok so the hardest part is over! Also you have a couple nifty little Java tools to interact with your cloud platform. In this next part we will deal with publishing the access logs from your app server fleet to S3 / Glacier. Luckily this is very easily done with the Amazon Console.

We use a load balancer to distribute web request made to our platform to a cluster of different app servers running our code. The load balancer will record the exact HTTP request and which server the request was routed to and save that in a one-line log file entry in the access log. Multiple access log files will be generated for multiple app servers and all the log files get pushed to S3. We use S3 to temporarily store our access log files and to generate events that access log file have been created (how temporary is configurable, could be 1 day or 1 year, whatever you like). After we are done processing our logs we back them up to Glacier for cheaper archival storage.

  1. Follow these instructions to push logs from your load balancer to S3
  2. Follow these instructions to push old logs from S3 to Glacier

Lambda

Amazon Lambda (Lambda) is the last piece of the puzzle. We use lambda to automatically launch computational resources to handle our S3 events and process the access log files. I will give you example code that you are free to use to process S3 Events and index them to Elasticsearch. Go ahead and set up Lambdas through the AWS Console. Notice that you need to upload a zip or jar file. The best way to do it is to package your lambdas with any dependencies as a jar, upload to S3 and then point to the S3 url through the console. One important caveat is that any dependencies have to be flattened out in you jar. Luckily there is a Maven plugin that can accomplish this.

Dependencies: you need the example code above along with the following maven dependencies along with the code we covered above.

                        
                            
      <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>aws-lambda-java-core</artifactId>
          <version>1.1.0</version>
      </dependency>


      <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>aws-lambda-java-events</artifactId>
          <version>1.1.0</version>
      </dependency>

      <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>18.0</version>
      </dependency>

      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.3.2</version>
          <scope>compile</scope>
      </dependency>

      <dependency>
          <groupId>commons-io</groupId>
          <artifactId>commons-io</artifactId>
          <version>2.4</version>
      </dependency>
                        
                    
S3EventAccessLogHandler.java

Notice that we Log using LambdaLogger. When the Lambda code runs these log line will be sent to Amazon CloudWatch. This enables you to debug your lambdas.

                        
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.event.S3EventNotification;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.yumavore.utils.aws.AmazonElasticsearchRequestClient;
import com.yumavore.utils.aws.AmazonOptions;
import com.yumavore.utils.logs.ElbLogEntryMarshaller;
import com.yumavore.utils.logs.LogEntryMarshaller;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URLDecoder;

/**
 * @author Perry Sakkaris
 */
public class S3EventAccessLogHandler implements RequestHandler<S3Event, String> {

    private final String accessLogBucketName = "YOUR_BUCKET_NAME";
    private final String searchDomain = "YOUR_SEARCH_ENDPOINT";

    @Override
    public String handleRequest(S3Event s3event, Context context) {
        try {
            LambdaLogger lambdaLogger = context.getLogger();

            S3EventNotification.S3EventNotificationRecord record = s3event.getRecords().get(0);
            String srcBucket = record.getS3().getBucket().getName();
            lambdaLogger.log(String.format("received s3 event for bucket: %s", srcBucket));

            if (accessLogBucketName.equals(srcBucket)) {
                String srcKey = record.getS3().getObject().getKey();
                srcKey = URLDecoder.decode(srcKey, "UTF-8");
                lambdaLogger.log(String.format("handling event for srcKey: %s", srcKey));

                AWSCredentials credentials = getCredentials();
                AmazonOptions options = AmazonOptions.builder()
                        .credentials(credentials)
                        .region(Regions.US_WEST_2.getName())
                        .build();

                AmazonElasticsearchRequestClient elasticsearchClient = new AmazonElasticsearchRequestClient(searchDomain, options);
                AmazonS3 s3Client = new AmazonS3Client(credentials);

                LogEntryMarshaller marshaller = new ElbLogEntryMarshaller();
                S3Object logObject = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey));
                BufferedReader reader = new BufferedReader(new InputStreamReader(logObject.getObjectContent()));
                String line = null;
                while((line = reader.readLine()) != null) {
                    String logJson = marshaller.marshallToJson(line);
                    String response = elasticsearchClient.post("/access-logs/entry", logJson);
                    lambdaLogger.log(response);
                }
                return "handled";
            } else {
                lambdaLogger.log("dont know how to handle event for this bucket");
                return "unhandled";
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AWSCredentials getCredentials() {
        return new DefaultAWSCredentialsProviderChain().getCredentials();
    }
}
                        
                    

I wrote some helper code to parse a log line into a Java object and then marshall it to JSON for indexing. Feel free to use, you can modify to use your own request parameters but this will work out of the box for Elastic Load Balancer access logs.

LogEntry.java
                        
/**
 * @author Perry Sakkaris
 */
public class LogEntry {
    private String dateTime;
    private String loadBlancer;
    private String clientIp;
    private String instanceIp;
    private Double requestProcessingTime;
    private Double backendProcessingTime;
    private Double responseProcessingTime;
    private Integer loadBalancerStatusCode;
    private Integer backendStatusCode;
    private Long recievedBytes;
    private Long sentBytes;
    private String httpMethod;
    private String request;
    private String httpProtocol;
    private String userAgent;
    private String sslCipher;
    private String sslProtocol;
    private String accessToken;

    public LogEntry() {}

    public String getDateTime() {
        return dateTime;
    }

    public void setDateTime(String dateTime) {
        this.dateTime = dateTime;
    }

    public String getLoadBlancer() {
        return loadBlancer;
    }

    public void setLoadBlancer(String loadBlancer) {
        this.loadBlancer = loadBlancer;
    }

    public String getClientIp() {
        return clientIp;
    }

    public void setClientIp(String clientIp) {
        this.clientIp = clientIp;
    }

    public String getInstanceIp() {
        return instanceIp;
    }

    public void setInstanceIp(String instanceIp) {
        this.instanceIp = instanceIp;
    }

    public Double getRequestProcessingTime() {
        return requestProcessingTime;
    }

    public void setRequestProcessingTime(Double requestProcessingTime) {
        this.requestProcessingTime = requestProcessingTime;
    }

    public Double getBackendProcessingTime() {
        return backendProcessingTime;
    }

    public void setBackendProcessingTime(Double backendProcessingTime) {
        this.backendProcessingTime = backendProcessingTime;
    }

    public Double getResponseProcessingTime() {
        return responseProcessingTime;
    }

    public void setResponseProcessingTime(Double responseProcessingTime) {
        this.responseProcessingTime = responseProcessingTime;
    }

    public Integer getLoadBalancerStatusCode() {
        return loadBalancerStatusCode;
    }

    public void setLoadBalancerStatusCode(Integer loadBalancerStatusCode) {
        this.loadBalancerStatusCode = loadBalancerStatusCode;
    }

    public Integer getBackendStatusCode() {
        return backendStatusCode;
    }

    public void setBackendStatusCode(Integer backendStatusCode) {
        this.backendStatusCode = backendStatusCode;
    }

    public Long getRecievedBytes() {
        return recievedBytes;
    }

    public void setRecievedBytes(Long recievedBytes) {
        this.recievedBytes = recievedBytes;
    }

    public Long getSentBytes() {
        return sentBytes;
    }

    public void setSentBytes(Long sentBytes) {
        this.sentBytes = sentBytes;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getUserAgent() {
        return userAgent;
    }

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    public String getSslCipher() {
        return sslCipher;
    }

    public void setSslCipher(String sslCipher) {
        this.sslCipher = sslCipher;
    }

    public String getSslProtocol() {
        return sslProtocol;
    }

    public void setSslProtocol(String sslProtocol) {
        this.sslProtocol = sslProtocol;
    }

    public String getHttpMethod() {
        return httpMethod;
    }

    public void setHttpMethod(String httpMethod) {
        this.httpMethod = httpMethod;
    }

    public String getHttpProtocol() {
        return httpProtocol;
    }

    public void setHttpProtocol(String httpProtocol) {
        this.httpProtocol = httpProtocol;
    }

    public String getAccessToken() {
        return accessToken;
    }

    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }
}
                        
                    
LogEntryMarshaller.java
                        
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * @author Perry Sakkaris
 */
public abstract class LogEntryMarshaller {

    private ObjectMapper objectMapper = new ObjectMapper();

    public String marshallToJson(String logLine) {
        LogEntry logEntry = marshall(logLine);
        try {
            return objectMapper.writeValueAsString(logEntry);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public abstract LogEntry marshall(String logLine);
}
                        
                    
ElbLogEntryMarshaller.java
                        
import org.apache.commons.lang3.StringUtils;

import java.util.logging.Logger;

/**
 * @author Perry Sakkaris
 * I realize you can do this all with Regular Expression, but in my experience if you have a problem and try to use
 * regular expression to solve it; you soon have 2 problems.
 */
public class ElbLogEntryMarshaller extends LogEntryMarshaller {
    private static final Logger LOG = Logger.getLogger(ElbLogEntryMarshaller.class.getName());

    @Override
    public LogEntry marshall(String logLine) {
        if(StringUtils.isBlank(logLine)) throw new IllegalArgumentException("logLine cannot be null");
        LOG.fine(String.format("parsing line: %s", logLine));

        int firstQuote = logLine.indexOf("\"");
        int secondQuote = logLine.indexOf("\"", firstQuote+1);
        int thirdQuote = logLine.indexOf("\"", secondQuote+1);
        int fourthQuote = logLine.lastIndexOf("\"");
        String values = logLine.substring(0, logLine.indexOf("\"")).trim();
        String request = logLine.substring(firstQuote+1, secondQuote).trim();
        String userAgent = logLine.substring(thirdQuote + 1, fourthQuote).trim();
        String sslInfo = logLine.substring(fourthQuote+1, logLine.length()).trim();

        LogEntry logEntry = new LogEntry();
        String[] logValues = values.split("\\s+");

        logEntry.setDateTime(logValues[0]);
        logEntry.setLoadBlancer(logValues[1]);
        logEntry.setClientIp(logValues[2]);
        logEntry.setInstanceIp(logValues[3]);
        logEntry.setRequestProcessingTime(Double.valueOf(logValues[4]));
        logEntry.setBackendProcessingTime(Double.valueOf(logValues[5]));
        logEntry.setResponseProcessingTime(Double.valueOf(logValues[6]));
        logEntry.setLoadBalancerStatusCode(Integer.valueOf(logValues[7]));
        logEntry.setBackendStatusCode(Integer.valueOf(logValues[8]));
        logEntry.setRecievedBytes(Long.valueOf(logValues[9]));
        logEntry.setSentBytes(Long.valueOf(logValues[10]));

        String[] requestValues = request.split("\\s+");
        logEntry.setHttpMethod(requestValues[0]);
        logEntry.setHttpProtocol(requestValues[2]);
        String[] endpointValues = requestValues[1].split("accessToken=");
        logEntry.setRequest(endpointValues[0]);
        if(endpointValues.length > 1) {
            logEntry.setAccessToken(endpointValues[1]);
        }

        String[] sslValues = sslInfo.split("\\s+");
        logEntry.setSslCipher(sslValues[0]);
        logEntry.setSslProtocol(sslValues[1]);
        logEntry.setUserAgent(userAgent);

        return logEntry;
    }

}
                        
                    

Before you package up your code and upload it to lambda, make sure you flatten out the dependencies with the shade plugin. Below is the maven snippet to do it.

                        
                            
        <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                    <!-- Show warnings for unchecked casts, etc. -->
                    <compilerArgument>-Xlint:all</compilerArgument>
                    <showWarnings>true</showWarnings>
                    <showDeprecation>true</showDeprecation>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.2</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
                    
                        
                    

Your are ready to go! Now do "mvn package", upload your jar to S3 and configure your lambda to use the jar. You still have a couple small details such as configuring your lambda to listen to the correct S3 bucket, but you can do that through the console and the lambda documentation walks you through it.

Below is an example of what you will see on Kibana. This shows us all requests that took more than 2.1 seconds to complete this week. Have fun!