Analytics with SQLAlchemy

Datetime:2016-08-23 05:08:14          Topic:          Share

What to do if a conventional, line-of-business app starts morphing into an analytics one? The situation here may be that a bit of custom SQL, introduced to do one specific bit of summarising or pivoting, expands and forms a kind of creeping undergrowth of tightly-bound code, impervious to Agile norms of modularity and testability.

In the approach below, I’ve used Python and SQLAlchemy to build up “big SQL” from small, unit-testable expressions. This keeps the codebase manageable without sacrificing the – often quite impressive – analytics performance obtainable with ordinary databases.

“Big SQL” resists being teased apart into independent, manageable functions because in defining inputs and outputs we have to let them be materialized somewhere. Persisting any bulky intermediate results, however, is going to clobber performance.

The example ( code here ) shows one way of taming complex SQL-based logic:

  • SQLAlchemy Core is used to create modular, testable expression elements that are developed separately but composed and executed efficiently at runtime
  • Unit Testing is supported by turning literal values in fixtures into input tables on-the-fly
  • SQLAlchemy ORM is used to add an entity mapping for results so they can be rendered as JSON etc.

The demo code doesn’t do very impressive analytics – it performs a simple “categorising pivot” to summarise sales of items across different price bands. Extending this example for logistic regression or deep learning is left as an exercise…

Limits of SQLAlchemy ORM

For those who have encountered SQLAlchemy mostly as an ORM, this extract from the example will be familiar – constructing a query for some entities that already exist:

    january_art_book_sales_query = session.query(m.Genre) \
         .join(m.Book) \
         .join(m.BookSale) \
         .join(m.Transaction) \
         .filter(m.Genre.name == 'Art') \
         .filter(and_(m.Transaction.create_date >= start_date, m.Transaction.create_date < datetime(2016, 2, 1)))

The query returns a Genre entity which, thanks to Alchemy’s lazy loading of all the related entities, will magically appear with the contained books and sales nested within it when referenced.

This is fine for directly accessing the existing model and doing simple things like COUNT(*). However, it can’t be used to transform and aggregate in order to derive any new entities. To create the kind of pivot used in the example means creating new table-like entities for the categories and aggregates.

SQLAlchemy Core

To build up the summary query, we start with a similar query to the above, this time in SQLAlchemy Core.

    def get_sales_for_period_te(booksales_te, books_te, genres_te, transactions_te):
         return select([
             booksales_te.c.book_id,
             books_te.c.price,
             books_te.c.genre_id,
             transactions_te.c.id.label('transaction_id'),
             transactions_te.c.create_date]) \
         .select_from(
             booksales_te
             .join(books_te)
             .join(genres_te)
             .join(transactions_te)
         ) \
         .where(between(transactions_te.c.create_date, bindparam('start_date'), bindparam('end_date')))

The joins and where-clause filters are different from ORM – just different enough to be confusing. The SQL-dressed-as-Python syntax can be offputting at first – it’s more cluttered in some ways so the promised maintainability benefit is mostly in the bigger picture.

Table expressions as parameters

The source entities (Book, BookSale etc.) are passed as “table expression” parameters rather than being mentioned explicitly. This is so any derived expression can be mocked by a literal value in a test.

Note that this function returns the expression for finding the result, not the result itself. This is what we want when we’re composing bigger expressions from small ones.

Bind variable parameters

The expression being built up can have real parameters of its own, of course. These are the bind variables, such as the start_date above, which are supplied when it’s executed. We avoid passing these in along with the table expressions to allow the final expressions to be cached and reused (both cached within Alchemy and in the database) as the cost of recreating it each time is substantial.

There doesn’t seem to be a neat way to compose parameters along with expressions as they all share the same namespace so crafting anything here would probably be elaborate enough obscure the rest of the example.

Composing table expressions (TEs)

Following the pattern where all functions take TEs as input and return a new TE, it’s obviously pretty easy to apply a MAX function on the relevant sales, the output of the above function being given to this one:

    def get_max_price_te(sales_for_period_te):
         return select([
             func.max(sales_for_period_te.c.price).label('max_price')
             ])

Unit testing

The example aggregates the sales figures by price band and by genre.

(The bands are calculated dynamically based on the number required, the max. price of the relevant sales and the rounding unit to be applied to the from- and to-price limits, e.g. rounding to 50 pence).

So, after max. price, the next ingredient to calculate is the increment or price difference to be used between bands:

    def get_increment_te(max_price_te, rounding_unit, max_num_bands):
         return select([
             func.greatest(
                 (max_price_te.c.max_price + (rounding_unit * max_num_bands - 1)) / rounding_unit / max_num_bands
                 * rounding_unit,
                 rounding_unit).label('incr')
         ])

Here we have a function that’s worth unit testing. In this case, I’ve chosen to throw a range of inputs at it, including base and edge cases, and check the results, all in one test. This at least keeps things concise:

    def test_increment_is_correct_value(self):
         for fixture in [
             {'max_price_in': {'max_price': 1}, 'result': [{'increment': 50}]},
             {'max_price_in': {'max_price': 50}, 'result': [{'increment': 50}]},
             {'max_price_in': {'max_price': 99}, 'result': [{'increment': 50}]},
             {'max_price_in': {'max_price': 499}, 'result': [{'increment': 100}]},
             {'max_price_in': {'max_price': 500}, 'result': [{'increment': 100}]},
             {'max_price_in': {'max_price': 501}, 'result': [{'increment': 150}]},
             {'max_price_in': {'max_price': 1001}, 'result': [{'increment': 250}]},
             {'max_price_in': {'max_price': 10001}, 'result': [{'increment': 2050}]},
         ]:
             results = self.exec(get_increment_te(
                 TestTable('max_price', [fixture['max_price_in']]),
                 50,
                 5))
             self.assertEqual(results, fixture['result'])

Table expression inputs are provided as literal dicts which are turned into real – if transient – tables by the TestTable class. The column names and types are inferred from the dictionary keys and values.

The table expression output is executed and the results turned into another plain dict. (Remember, we’re just getting row-like results here, not the mapped entities retrieved in ORM).

The important thing of course is that we’re able to test any expression, including intermediate ones hiding in the middle of a big compound query.

The final expression

There is sometimes a bit of voodoo involved in coaxing Alchemy to compose expressions as you’d like, at least in version 1.0. Essentially we can choose whether to include a subexpression as a common table expression (CTE), so reusing both it and its results, or as a subquery, so reusing the same expression in a new context. Whether the new context ends up producing different results will depend on whether it’s correlated to the enclosing query – obviously if the results are going to be the same the subquery should be factored out into a CTE.

In the following, the sales history is retrieved once as the sales CTE – this is definitely something that should only be done once:

    sales_for_period_cte = get_sales_for_period_te(
         m.BookSale.__table__,
         m.Book.__table__,
         m.Genre.__table__,
         m.Transaction.__table__).cte('sales')
 
     max_price_cte = get_max_price_te(
         sales_for_period_cte) \
         .cte('max_price')
 
     increment_cte = get_increment_te(
         max_price_cte,
         rounding_unit=50,
         max_num_bands=5) \
         .cte('incr')

Deriving the price bands is also non-trivial so this expression also has a unit test. (Although it looks like part of the expression, the max_num_bands value ends up being a bind variable, just a static one):

    price_bands_te = get_price_bands_te(
         max_price_cte,
         increment_cte,
         max_num_bands=5) \
         .cte('price_bands')

We now map the sales to the bands and, finally, perform the aggregation by band and genre to give our completed expression:

    sales_in_bands_cte = get_sales_in_bands_te(
         sales_for_period_cte,
         price_bands_te).cte('sales_in_bands')
 
     return get_total_sales_by_band_and_genre_te(
         sales_in_bands_cte)

Mapping results to classes

The above query will get our results, and quite efficiently too. So what’s the point of trying to retrofit an entity class on this result set? The reasons are more to do with graphs than OO:

  • Generating JSON, e.g. via Marshmallow, relies on serializing a graph of dicts.
  • Results-as-classes means that foreign keys values can be made to work as relationships again

To map to a class we’re going to have to provide the information ORM can’t figure out for itself, such as keys and relationships. We don’t have to define columns though – it’s able to get these from the query expression.

    class SalesByPriceBandAndGenre(m.Base):
         __table__ = mapped_te().alias('total_sales')
 
         # Create a primary key for anything needing object identity, e.g. Marshmallow
         __mapper_args__ = {
             'primary_key': [__table__.c.price_band, __table__.c.genre_id]
         }
 
         genre = relationship(m.Genre, primaryjoin="SalesByPriceBandAndGenre.genre_id == Genre.id", viewonly=True)

The compound primary key might not be a great idea – in other situations generating a row number or other single surrogate key would probably be better.

Since the columns that the mapper digs out of the query won’t include foreign key attributes we have to provide that explicitly in the relationship . The viewonly flag stops Alchemy getting into trouble by trying to track changes.

Should this be allowed?

This “classical mapping” – mentioning the SELECTable in the __table__ definition, is discouraged in the Alchemy docs. The recommendation there is to use a column_property or similar small tweak to an existing mapping, but this kind of thing can’t handle the kind of transformations needed for analytics. (I found that trying to shoehorn even quite a simple calculation into this model – a percentile rank – didn’t really work).

Other uses

This pattern can obviously apply whenever a query threatens to become too complex to maintain as a single statement or too slow to be run as separate statements. This applies particularly when exploiting the stats-over-advanced-groupings (cubes etc.) now in Postgres.

It can also start to influence how data models are designed. For example, returning a graph with nodes derived from different entities would usually need careful implementation in the model, deriving entities from some Node class and requiring the full ORM machinery to be thrown in. Here, where diverse source tables can be transformed into a common structure without too much trouble, and that structure then queried using WITH RECURSIVE…. UNION ALL and mapped to a class, there’s much less need to decide up-front what analysis views of the data are going to be needed.