Deep inside the tunnel

May 22, 2018

Okay, so in week 2, I’ve had some good challenges to overcome. I was given 3 tasks this week:

  1. To cleanup esquery.py file. This file is the primary file to create elasticsearch DSL queries which are sent to elasticsearch to get the required results.
  2. To create from scratch, a new notebook which queries elasticsearch enriched indices and calculates the metrics.
  3. To think about structure of new classes and functions that can be created for the metrics as the old classes are cumbersome to use.

Task-1

I’ll start with the First task: below is an analysis of the esquery.py file. This task is actually very vital and related to the other 2 tasks, because while analysing this file, I can tell you what we might need to create the new notebook and the generalized functions for tasks 2 and 3 respectively.

esquery.py

import json
from datetime import timezone
from elasticsearch_dsl import A, Search, Q
USE_ELASTIC_DSL = True

REMEMBER THIS FLAG. The motivation for the first task is to eliminate the need for this flag.

class ElasticQuery():
    """ Helper class for building Elastic queries """
    AGGREGATION_ID = 1  # min aggregation identifier
    AGG_SIZE = 100  # Default max number of buckets
    ES_PRECISION = 3000

Importing necessary modules and creating the base class. Precision threshold, AGG_SIZE = Number of values to get in the query, AGGREGATION_ID = name of the aggregation bucket created.


The following are some of the functions in the ElasticQuery class.

  • get_query_filters() This function creates a string containing all the filters that are to be applied to the query. After reading this and this post, if we want to match certain phrases for a field, we should be using match_phrase instead of match along withtype because the type field has been depretiated and will throw an error when you try to use it.

  • get_query_range() This function creates a range filter with the date_field. date_field works as a reference timeline in which we have to search for the date. Along with the greater than equal to(from) and less than equal to(to) as parameters, we can search for terms or apply filters to specify the time period in which we have to perform the search.

  • get_query_basic() This function combines the range and filter functions to create a query. If an inverse query i.e a filter which should not be searched/looked for in the index is present, it is added under must_not. Must == AND, must_not == NOT. This function also allows the analysis of wildcard characters which are ignored by default. Read about it here.

While doing this task, I learned about the different aggregations that elasticsearch supports and how we can combine and manipulate these aggregations to get useful results.

  • get_query_agg_terms() Get aggregations based on terms.

  • get_query_agg_max() Get max of a field. Ex: lines_changed:[34, 2, 44, 32, 124, 678, 432, 32, 1], max: 678.

  • get_query_agg_precentile Get percentile of a field. This gives us a percentile over a range of values in a particular field. Ex: if we look at the 95th percentile, then the value at 95th percentile is greater than 95% of all the values.

  • get_query_agg_avg() Get average of a field. Ex: lines_changed:[34, 2, 44, 32, 124, 678, 432, 32, 1], avg: 153.22

  • get_query_agg_cardinality() Get approx distinct count(cardinality) of a field.

  • get_query_agg_ts(): this function gives us a range of values or a timeseries based on a field. The field that we are using should be numeric or the aggregation that we are applying should return a numeric value. An example of a query by this function will be to count the number of commits made in the project per month. This function performs Date Histogram aggregation.

  • get_count() This just sets the size parameter to 0. When size is zero, hits data is not returned, and just the count i.e the number of objects matching/having that aggregation/filter/range are returned.

get_agg function:

This function is the only function that is being used in metrics.py file, the only file using esquery.py. To modify this file, I had to specially take care of this function. Initially, this function wasn’t using any of the above shown functions.

@classmethod
def get_agg(cls, field=None, date_field=None, start=None, end=None,
            filters=None, agg_type="terms", offset=None, interval=None):
    """
    Compute the aggregated value for a field.
    If USE_ELASTIC_DSL is True it uses the elastic_dsl library. If not, esquery (this) module is
    used to build the query.
    :param field: field to get the time series values
    :param date_field: field with the date
    :param interval: interval to be used to generate the time series values
    :param start: date from for the time series
    :param end: date to for the time series
    :param agg_type: kind of aggregation for the field (cardinality, avg, percentiles)
    :param offset: offset to be added to the time_field in days
    :return: a string with the DSL query
    """

    query_basic = cls.__get_query_basic(date_field=date_field,
                                        start=start, end=end,
                                        filters=filters)

Start with getting the basic query to add to the final query.

if agg_type == "count":
    agg_type = 'cardinality'
elif agg_type == "median":
    agg_type = 'percentiles'
elif agg_type == "average":
    agg_type = 'avg'
# Get only the aggs not the hits
s = Search()[0:0]

Create a Search object with size and from equals 0. This way, when we get the results, we’ll only get the total counts and not the actual data. Setting size to 0 also helps in faster response from elasticsearch as no actual data has to be returned and only aggregation/number of values have to be returned.

for f in filters:
    param = {f: filters[f]}
    if f[0:1] == "*":
        param = {f[1:]: filters[f]}
        s = s.query(~Q("match", **param))
    else:
        s = s.query(Q("match", **param))
date_filter = cls.__get_query_range(date_field, start, end)
s = s.query(json.loads(date_filter))

Here we are adding the filters(normal and inverse) and date range to the Search object created above.

if not interval:
    if agg_type == "terms":
        query_agg = ElasticQuery.__get_query_agg_terms(field)
    elif agg_type == "max":
        query_agg = ElasticQuery.__get_query_agg_max(field)
    elif agg_type == "cardinality":
        query_agg = ElasticQuery.__get_query_agg_cardinality(field)
    elif agg_type == "percentiles":
        query_agg = ElasticQuery.__get_query_agg_percentiles(field)
    elif agg_type == "avg":
        query_agg = ElasticQuery.__get_query_agg_avg(field)
    else:
        raise RuntimeError("Aggregation of %s not supported" % agg_type)
else:
    query_agg = ElasticQuery.__get_query_agg_ts(field, date_field,
                                                start=start, end=end,
                                                interval=interval,
                                                agg_type=agg_type,
                                                offset=offset)

This part is to get the aggregation string according to the aggregation type selected by the user. Note: this doesnot have any relation to the Search object created above.

if agg_type not in ['percentiles', 'terms', 'avg']:
    field_agg = A(agg_type, field=field,
                  precision_threshold=cls.ES_PRECISION)
else:
    field_agg = A(agg_type, field=field)
agg_id = cls.AGGREGATION_ID
if interval:
    # Two aggs, date histogram and the field+agg_type
    bounds = ElasticQuery.__get_bounds(start, end)
    if offset:
        # With offset and quarter interval bogus buckets are added
        # to the start and to the end if extended_bounds is used
        # https://github.com/elastic/elasticsearch/issues/23776
        bounds = {"offset": offset}
    ts_agg = A('date_histogram', field=date_field, interval=interval,
               time_zone="UTC", min_doc_count=0, **bounds)
    s.aggs.bucket(agg_id, ts_agg).metric(agg_id + 1, field_agg)
else:
    s.aggs.bucket(agg_id, field_agg)

Here, we extend the search object by adding into it the aggregation depending on the aggregation type specified by the user. The aggregation string created in the previous section is not being used here.

query = """
    {
      "size": 0,
      %s,
      %s
      }
""" % (query_agg, query_basic)

Create the final query by using the basic query string created initially, containing the filters and the range, and the aggregation query.

if USE_ELASTIC_DSL:
    return json.dumps(s.to_dict())
else:
    return query

This function basically returns a fully fleshed out query to get aggregation count for the type of aggregation selected, filters used and the range of time in which the aggreagtion has to be applied on the values for field=field.

As we can see, the USE_ELASTIC_DSL flag is set to True initially in the file, which means that actually: none of the above queries created are used and the output of the get_agg function is only the Search object that was created in the function and the objects that were attached to it for range, filter and such.

What I did was to convert each function to return elasticsearch_dsl objects and modified the get_agg function to use the above created functions and return a search object.

You can look at the differences here.

Results:

Task-2 and Task-3:

These tasks are very much related to each other.

Currently, when we have to calculate a metric, we have to create an object from a class which looks like this:

class GithubOpen(GitHubIssuesMetrics):
""" Tickets Open metric class for issue tracking systems """
id = "open"
name = "Open tickets"
desc = "Number of tickets currently open"
FIELD_COUNT = "id"
FIELD_NAME = "url"
FIELD_DATE = "created_at"
filters = {"pull_request": "false", "state": "open"}

Here, FIELD_COUNT is the field which is used for aggregation, FIELD_DATE is used to create the date range. Filters specify that this is an open pull request.

# initialise the object
open_issues = GithubOpen(es_url, github_index, start=start_date, end=end_date)
# using the get_agg method, get the aggregation which in this case is also the Metric Open PRs.
open_issues.get_agg()

For grimoirelab-perceval this gives an answer 23.

So, for issues, the GitHubIssuesMetrics class is a subclass of the Metrics class. GithubOpen is a subclass of the GitHubIssuesMetrics class. And then we create an object from this class and use it. It has all the methods of the Metrics class which we want to use.

But what if we want to sort the pull requests by the organizations or users that created them? What about getting the PRs for individual users? How about sorting the commits/PRs by weeks or months?

Although We can use nested aggregations to get the commits per user or orgs and we can use the methods in the Metrics class such as get_ts() which uses the functions from esquery.py file to create a date_histogram which will give us a timeseries of the PRs per week. This method of creating aggregations/filters and creating histogram seems a little to complicated. This method may be taking the longer path to get the same results that we might be able to get if we redesign the classes and functions currently being used.

To achieve this, we’ll take a look at an experimental notebook which calculates the metrics using elasticsearch_dsl queries only and does not use the functions in manuscripts.

An example to calculate the same Open issues:

s = Search(using=es, index=github_index)
q1 = Q("match", **{"item_type":"issue"})
q2 = Q("match", **{"state": "open"})
q = q1 & q2
s = s.query(q)
agg = A("cardinality", field="id_in_repo")
s.aggs.bucket("num_open_issues", agg)
s = s.extra(size=0)
response = s.execute()
response.aggregations.num_open_issues.value

Rather than going through the complex process described above, we can use this simple code snippit to calculate the open issues. We will define classes and helper functions such as by_period, by_user, by_organization, to seggregate the, for example, number of commits by the time period in which they were created, by the author of the commit and by the organization under which they were created respectively.

All processes will be documented in the Notebook.


Issues created:

  • What are abandoned issues? It is not specified what an abandoned issue is and how it can be calculated.
  • Open Issue Age: should open issue age be calculated by averaging all the values or should their values be used directly and visualised as a graph?
  • Closed issue duration: How exactly is closed issue duration supposed to be described? Similar to Open issue age?

The resolution of the above issues will help us better understand the Metrics and help me to calculate and describe them in an efficient way.

That was all for this week!


For the next week, I’ll have 4 tasks that I need to accomplish:

  • Cleanup esquery.py file a bit further

    • 1a. Finish the current PR by adding tests for all the functions and correct & cleanup the comments in the functions.
    • 1b. The get_aggs function returns a JSON object to be used by the functions in metrics.py file. Evaluate, and if it is not a lot of work, implement, changes in the rest of manuscripts, including metrics.py, to use only es_dsl.
  • Implement more GMD metrics:

    • 2a: Implement all GMD metrics that are implementable (are available from the enriched indexes) in the notebook using elasticsearch_dsl only.
    • 2b: For those that are missing in the enriched index: open an issue in gelk to implement them in the enriched index, and find ways to aggregate and bucket them.
  • Work on the new functions:

    • 3a: create chainable objects(by that we mean: create functions and classes which can be used together) for at least “by_period”, “by_organization”, “by_author”. This will be used to seggregate the data by_period, by_org and by_author respectively.
  • Work on Visualizations!

    • 4a: test plotly and seaborn for static visualization
    • 4b: do the same with Altair and Plotly for interactive visualizations

Adios.