Drastic Elastic [Part 4]: Aggregations & Plugins

Datetime:2016-08-23 02:07:06          Topic: Elastic Search           Share

In an earlier post in this mini-series I mentioned that the aggregated data we persist in ELasticSearch has discrete retention times:

  • 5 minute aggregation => (retention time of) one day
  • hourly aggregations => 7 days
  • daily aggregations => 5 years

This means that we reach well over 50% of our total data retention after one week (the only additions after that point are daily aggregations while data at other aggregation levels gets refreshed/updated) – after 4 or 5 weeks we had something like 8 billion documents in ElasticSearch amounting to 13 TB of data.

In this last article of our four part series we describe how ElasticSearch plugins help us to address appropriate aggregation levels without having to build in extra round trips (adding to latency), or to fetch more data than we need (which would require filtering in the client).

When querying the data from our client, we don’t always want to force the user to decide what level of granularity to chose, since this is partly implicit in the nature of the query: if the user requests data from over a week ago, then the only option is to retrieve it from the daily aggregation, whereas a request for data covering the past three hours could come from one of two aggregation levels (5-minute, or hourly). It would be nice to be able to pass some parameters with the request to let elasticsearch decide on its own which data source to address.

Plugins to the rescue!

We decided to implement this by writing our own custom ElasticSearch plugin. Plugins have long been a feature of ElasticSearch, and several of the supported add-ons and monitoring tools (such as Kopf or Bigdesk), run as plugins. We implemented a java plugin – note that site and mixed plugins are now deprecated (version 2.3) and will be removed in future versions .

ElasticSearch offers a standard extensibility pattern for custom plugins. These can be developed separately, registered with the cluster, and addressed via a custom REST end-point.

Custom plugins offer several advantages:

  • Plugins have low level access to all phases of the search engine
  • Mapping or other index configuration information only needed by ElasticSearch does not have to be deployed to external components
  • Functionality common to external components only has to deployed once

However, the following points should also be noted:

  • a Plugin should normally be installed on all nodes in a cluster, especially if it implements actions that can be addressed by other nodes (broadcast operations): an exception to this might be a situation where it is desirable to maintain state without extending cluster state metadata, or where it is clear that the plugin will only be addressed on certain nodes (for instance, where reads and writes use dedicated node sets)
  • Plugins should report their metadata to the cluster state, to avoid parallel or overlapping callouts
  • Plugins are linked to a NodeClient, but can potentially address all nodes in the cluster, so that e.g. BulkImport operations can run in parallel: however, they are dependent on the node life cycle and may need to be restarted if forced to relocate to another node

How to write a plugin is out of the scope of this article, although online guides are not hard to come by . Note, though, that the implementation details for the latest version of ElasticSearch seem to differ for earlier versions (we used ElasticSearch 1.7). Any code snippets below refer to plugins developed for versions 1.7.

Approach

Since we did not want to duplicate any business logic in the plugin code, we decided to use a simple algorithm to determine which indices should be addressed – based on one or more custom parameters – and then to „hand off“ the original search request in its entirety, having only „injected“ the index collection. To do this, we had a look at the RestSearchAction.java class, which implements handleRequest like this:

  @Override
    public void handleRequest(final RestRequestrequest, final RestChannelchannel, final Clientclient) {
        SearchRequestsearchRequest;
        searchRequest = RestSearchAction.parseSearchRequest(request);
        searchRequest.listenerThreaded(false);
        client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));
    }

Our plugin code only needs to implement our own parseSearchRequest: we can then parse out our parameter values from the RestRequest object and construct a SearchRequest object using the indices we want, without having to change anything else.

    @Override
 protected void handleRequest(final RestRequestrequest, final RestChannelchannel, final Clientclient)
 throws Exception {
 SearchRequestsearchRequest = parseSearchRequest(request);
 searchRequest.listenerThreaded(false);
 client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));
 }
 
 private SearchRequestparseSearchRequest(RestRequestrequest) {
 String maxpoints = request.param("maxpoints");
 String base_ts = request.param("bts");
 String min_pc_coverage = request.param("mincoverage");
 String indexRootName = request.param("indexroot");
 ...
 // code to select indices based on parameter values
 ...
 }

Example

An example callout looks like this:

curl -XGET 'http://localhost:9200/myIndex/myType/_indexpicker/_search?
 
    indexroot=myRootIndex&
 maxpoints=100&
 mincoverage=75&
 bts=1468670400000&
 starttimestamp=1468324800000&
 endtimestamp=1468584000000
 
 &pretty' -d '{
 
 "size": 1000,
 "fields": ["myField1", "myField2"],
 "query": {
 "filtered": {
 ...
 },
 "filter": {
 ...
 }
 }
 },
 "sort": {
 ...
 }
 }
}'

Where the parameters are defined thus:

  • indexroot : the „base“ index for the query (any time-based aggregation related to this root index will be considered for selection)
  • maxpoints : the maximum number of hits that can be displayed by the client
  • starttimestamp : range-start (parsed out of query if not supplied)
  • endtimestamp : range-end (parsed out of query if not supplied)
  • mincoverage : the minimum percentage of the defined time range (from starttimestamp to endtimestamp) to be covered by the selected aggregation level
  • bts : the base timestamp, defaults to „now“

The image below shows how this information fits together:

Algorithm: „give me the aggregation level where: a) mincoverage is fulfilled and b) #buckets is the highest but not greater than maxpoints.“

The area in green is the range we are interested in, with each bar (5-minute, daily, hourly) representing an aggregation level that contains a number of aggregated values (or buckets). Our first check is to filter out any index (= aggregation level) that does not provide the required coverage: from the remaining index candidates we take the one that offers the most data points without exceeding the given limit.

Refinements

Once we had this basic implementation working it quickly became clear that certain queries that overlapped all three aggregation levels, were only returning data for a portion of the time range. If my query requests data for the past 8 days, then I have daily aggregate values for 8 days ago, hourly aggregate values from 7 days ago until one hour ago, and 5 minute samples from the past hour. If our simple algorithm selects the hourly aggregation as being the best fit, then I forfeit the ability to display the most recent data. To solve this problem we extended the plugin functionality to „drop down“ to finer-grained indices for more recent time periods. This added to the complexity, but was not any more invasive than the initial version.

We now had a nice simple plugin that returned hits over one or more indices, but how could we verify that the results are as expected? Could we not somehow log from which index the information was coming?

Our third iteration was to implement our own RestResponseListener, so that we could parse the results:

    @Override
 protected void handleRequest(final RestRequestrequest, final RestChannelchannel, final Clientclient)
 throws Exception {
 IndexPickerContentListenerlistener = new IndexPickerContentListener(channel);
 String[] indices = getIndices(request, listener, client);
 
 SearchRequestsearchRequest = parseSearchRequest(request, indices);
 searchRequest.listenerThreaded(false);
 
 client.search(searchRequest, listener);
 }

where IndexPickerContentListener can be used in place of RestStatusToXContentListener:

class IndexPickerContentListener extends RestResponseListener<SearchResponse> {

We now have full access to the response object, which we use to deliver metadata with the query result:

{
 "metadata": {
 "index_picker_source": "1.3.2",
 "referenced_indices": {
 "AGGREGATE_5MIN": ["my_5min_index"],
 "AGGREGATE_HOUR": ["my_hourly_index"],
 "AGGREGATE_DAY": ["my_daily_index"]
 },
        "hits_by_window": 5,
 "AGGREGATE_5MIN": 3,
 "AGGREGATE_HOUR": 2,
 "AGGREGATE_DAY": 0
 }, 
 "took": 387,
 "timed_out": false,
 "_shards" : {
 "total" : 4,
 "successful": 4,
 "failed": 0
 }, 
 "hits": {
 ...
 }
}

The plugin code now has clear dependencies on internal ElasticSearch code, which obviously comes with a risk when upgrading. This was something we were well aware of, although our testing prior to upgrades only resulted in one breaking change (an altered constructor signature) that was easily fixed.

Conclusion

Using plugin extensibility helped us to address appropriate aggregation levels without having to

  • build in extra round trips (adding to latency), or
  • to fetch more data than we need (which would require filtering in the client).

Read on …

So you’re interested in search based applications, text analytics and enterprise search solutions? Have a look atour website and read about the services we offer to our customers.

Join us!

Are you looking for a job in search or analytics? We’re currently hiringSearch Engineers as well asJunior and Senior Big Data Scientists .

Read the complete series





About List