33x Faster Queries on Google Cloud's Dataproc

Datetime:2016-08-22 21:53:50          Topic: Hive           Share

I'd like to thank Felipe Hoffa , Developer Advocate at Google and Dennis Huo , Tech Lead and Manager of Google Cloud's Dataproc for their research and insights they shared with me on getting better performance out of Dataproc.

A few weeks ago I published ablog post outlining how to launch a 5-node Dataproc cluster on Google's Cloud service. In that post I hadn't taken the time to tune any table or storage backend settings and as a result didn't see query times that Dataproc and Presto are capable of.

In this blog post I walk through four performance-impacting settings for the 1.1 billion record taxi trip data set and see how fast I can get my queries to run.

Dataproc Up and Running

To start, I made a request to increase my maximum core count on Google Cloud to 50 cores in EUROPE-WEST-1. To my surprise the request was approved within a minute and I was good to go.

Below is the launch command I ran to create an 11-node cluster of n1-standard-4 instance types. This will give me a combined total of 44 virtual CPUs, 165 GB of RAM and around 5,500 GB of capacity across all drives.

$ gcloud dataproc clusters \
    create trips \
    --zone europe-west1-b \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-size 500 \
    --num-workers 10 \
    --worker-machine-type n1-standard-4 \
    --worker-boot-disk-size 500 \
    --scopes 'https://www.googleapis.com/auth/cloud-platform' \
    --project taxis-1273 \
    --initialization-actions 'gs://taxi-trips/presto.sh'

The cluster took around two minutes to launch. I was then able to SSH into the master node.

$ gcloud compute ssh \
    trips-m \
    --zone europe-west1-b

1.1 Billion Taxi Trips on Google Cloud Storage

The dataset I'm using is the 1.1 billion New York City taxi trips dataset I generated in my Billion Taxi Rides in Redshift blog post. The data sits in GZIP'ed CSV files and takes up around 500 GB of space when uncompressed.

$ gsutil ls -l gs://taxi-trips/csv/
2024092233  2016-04-11T09:53:27Z  gs://taxi-trips/csv/trips_xaa.csv_copy_1.gz
2023686578  2016-04-11T09:53:27Z  gs://taxi-trips/csv/trips_xab.csv_copy_1.gz
2022717460  2016-04-11T09:53:28Z  gs://taxi-trips/csv/trips_xac.csv_copy_1.gz
...

Comparing Zlib and Snappy

The first comparison I'll run will contrast query speeds between data compressed in zlib format versus Snappy. I'll first create a table representing the CSV data stored on Google Cloud Storage (GCS) and then I'll create two tables, one for a zlib-compressed version of the data and the other for a Snappy-compressed version. Both compressed tables will be in the columnar ORC format.

$ hive
CREATE EXTERNAL TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 'gs://taxi-trips/csv/';

CREATE EXTERNAL TABLE trips_orc_snappy (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 'gs://taxi-trips/orc_snappy/'
  TBLPROPERTIES ("orc.compress"="SNAPPY",
                 "orc.stripe.size"="67108864",
                 "orc.row.index.stride"="50000");

CREATE EXTERNAL TABLE trips_orc_zlib (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 'gs://taxi-trips/orc_zlib/'
  TBLPROPERTIES ("orc.compress"="ZLIB",
                 "orc.stripe.size"="67108864",
                 "orc.row.index.stride"="50000");

With the tables setup I'll import the CSV data into both compressed tables.

$ screen

$ echo "INSERT INTO TABLE trips_orc_snappy
        SELECT * FROM trips_csv;

        INSERT INTO TABLE trips_orc_zlib
        SELECT * FROM trips_csv;" | hive

The import into the Snappy table took an hour and 24 minutes while the zlib table took an hour and 28 minutes.

The Snappy data takes up 124 GB of capacity across 57 files on GCS while the zlib data takes up ~85 GB of space across the same number of files. Note the total objects count includes both the files and the directory itself.

$ gsutil ls -l gs://taxi-trips/orc_snappy/
2410749849  2016-04-28T11:59:41Z  gs://taxi-trips/orc_snappy/000000_0
2431245233  2016-04-28T11:59:42Z  gs://taxi-trips/orc_snappy/000001_0
...
1639595050  2016-04-28T12:00:29Z  gs://taxi-trips/orc_snappy/000056_1
TOTAL: 58 objects, 133396472839 bytes (124.24 GiB)
$ gsutil ls -l gs://taxi-trips/orc_zlib/
1675029328  2016-04-28T13:28:30Z  gs://taxi-trips/orc_zlib/000000_0
1673806010  2016-04-28T13:28:31Z  gs://taxi-trips/orc_zlib/000001_0
...
1130370727  2016-04-28T13:29:09Z  gs://taxi-trips/orc_zlib/000056_0
TOTAL: 58 objects, 91819315282 bytes (85.51 GiB)

Benchmarking Zlib vs Snappy

Dataproc currently ships with Presto 0.144.1 which is a respectable recent release of the software as of this writing.

$ presto \
    --catalog hive \
    --schema default

Query 1 results: 7:09 for Snappy, 4:50 for zlib.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy
GROUP BY cab_type;
Query 20160428_133028_00003_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
7:09 [1.13B rows, 536MB] [2.64M rows/s, 1.25MB/s]
SELECT cab_type,
       count(*)
FROM trips_orc_zlib
GROUP BY cab_type;
Query 20160428_133745_00004_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
4:50 [1.13B rows, 237MB] [3.91M rows/s, 837KB/s]

Query 2 results: 5:19 for Snappy, 5:34 for zlib.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy
GROUP BY passenger_count;
Query 20160428_134253_00005_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
5:19 [1.13B rows, 5.12GB] [3.55M rows/s, 16.4MB/s]
SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_zlib
GROUP BY passenger_count;
Query 20160428_134814_00006_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
5:34 [1.13B rows, 3.84GB] [3.4M rows/s, 11.8MB/s]

Query 3 results: 4:19 for Snappy, 4:45 for zlib.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_135354_00007_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:19 [1.13B rows, 8.79GB] [4.38M rows/s, 34.7MB/s]
SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_zlib
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_135823_00008_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
4:45 [1.13B rows, 6.97GB] [3.97M rows/s, 25MB/s]

Query 4 results: 4:38 for Snappy, 5:34 for zlib.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_140320_00009_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:38 [1.13B rows, 12.2GB] [4.08M rows/s, 45MB/s]
SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_zlib
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_140808_00010_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
5:34 [1.13B rows, 9.36GB] [3.39M rows/s, 28.7MB/s]

I was surprised that there wasn't a greater difference in query performances given Snappy is using almost 50% more space on GCS than zlib. The first query difference was substantial but the others weren't. I decided to run the first queries a few times and see if they could fall into a consistent time range.

Query 1 on Snappy again:

SELECT cab_type,
       count(*)
FROM trips_orc_snappy
GROUP BY cab_type;
Query 20160428_141352_00011_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:23 [1.13B rows, 536MB] [4.32M rows/s, 2.04MB/s]
Query 20160428_142306_00013_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:26 [1.13B rows, 536MB] [4.27M rows/s, 2.02MB/s]
Query 20160428_142743_00014_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:26 [1.13B rows, 536MB] [4.26M rows/s, 2.01MB/s]

Query 1 on zlib again:

SELECT cab_type,
       count(*)
FROM trips_orc_zlib
GROUP BY cab_type;
Query 20160428_141826_00012_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
4:25 [1.13B rows, 237MB] [4.28M rows/s, 914KB/s]
Query 20160428_143220_00015_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
5:07 [1.13B rows, 237MB] [3.69M rows/s, 789KB/s]
Query 20160428_143740_00016_mi69s, FINISHED, 10 nodes
Splits: 1,437 total, 1,437 done (100.00%)
4:51 [1.13B rows, 237MB] [3.89M rows/s, 832KB/s]

In this case it looks like Snappy is the more consistent compression format for performance so I'll continue to use it.

10K vs 50K Index Strides

The next setting I want to benchmark is the index stride size. The previous tables used a 10K setting. I'll contrast that with a 50K setting. Since I already have a Snappy-compressed table using the 10K index stride setting I'll only need to create a 50K version.

$ hive
CREATE EXTERNAL TABLE trips_orc_snappy_50k (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 'gs://taxi-trips/orc_snappy_50k/'
  TBLPROPERTIES ("orc.compress"="SNAPPY",
                 "orc.stripe.size"="67108864",
                 "orc.row.index.stride"="50000");

The following import took an hour and 26 minutes and resulted in 124 GB of GCS capacity being taken up, a completely byte count-identical amount (133,396,472,839 bytes) to the 10K version of this table.

$ echo "INSERT INTO TABLE trips_orc_snappy_50k
        SELECT * FROM trips_csv;" | hive
$ gsutil ls -l gs://taxi-trips/orc_snappy_50k/
2410749849  2016-04-28T17:33:50Z  gs://taxi-trips/orc_snappy_50k/000000_0
2431245233  2016-04-28T17:33:51Z  gs://taxi-trips/orc_snappy_50k/000001_0
...
1639595050  2016-04-28T17:34:36Z  gs://taxi-trips/orc_snappy_50k/000056_1
TOTAL: 58 objects, 133396472839 bytes (124.24 GiB)

Benchmarking 10K vs 50K Index Strides

$ presto \
    --catalog hive \
    --schema default

Since it had been three hours since I'd run the 10K benchmarks and the European work day was ending (meaning possibly fewer customers eating up resources on Dataproc) I'll be running them again during the same time period as the 50K benchmarks.

Query 1 results: 4:11 for 10K, 4:07 for 50K.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy
GROUP BY cab_type;
Query 20160428_175257_00023_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:11 [1.13B rows, 536MB] [4.51M rows/s, 2.13MB/s]
SELECT cab_type,
       count(*)
FROM trips_orc_snappy_50k
GROUP BY cab_type;
Query 20160428_173500_00019_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:07 [1.13B rows, 536MB] [4.58M rows/s, 2.17MB/s]

Query 2 results: 4:45 for 10K, 4:36 for 50K.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy
GROUP BY passenger_count;
Query 20160428_175732_00024_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:45 [1.13B rows, 5.12GB] [3.98M rows/s, 18.4MB/s]
SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy_50k
GROUP BY passenger_count;
Query 20160428_173922_00020_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:36 [1.13B rows, 5.12GB] [4.1M rows/s, 19MB/s]

Query 3 results: 4:21 for 10K, 4:12 for 50K.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_180230_00025_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:21 [1.13B rows, 8.79GB] [4.34M rows/s, 34.4MB/s]
SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy_50k
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_174413_00021_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:12 [1.13B rows, 8.79GB] [4.5M rows/s, 35.7MB/s]

Query 4 results: 4:50 for 10K, 4:09 for 50K.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_180703_00026_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:50 [1.13B rows, 12.2GB] [3.91M rows/s, 43MB/s]
SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_50k
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_174835_00022_mi69s, FINISHED, 10 nodes
Splits: 2,071 total, 2,071 done (100.00%)
4:09 [1.13B rows, 12.2GB] [4.56M rows/s, 50.2MB/s]

Though not by a wide margin, the 50K Index Stride is the winner in this case.

64 MB vs 256 MB Index Stripes

The next setting I'll look at is index stripe size. I've been using a 64 MB stripes in the benchmarks so far so I'll create a table with 256 MB stripes and see what performance differences there are.

$ hive
CREATE EXTERNAL TABLE trips_orc_snappy_50k_256mb (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 'gs://taxi-trips/orc_snappy_50k_256mb/'
  TBLPROPERTIES ("orc.compress"="SNAPPY",
                 "orc.stripe.size"="268435456",
                 "orc.row.index.stride"="50000");

The following import took an hour and 26 minutes which resulted in 122 GB of capacity being used up on GCS.

$ echo "INSERT INTO TABLE trips_orc_snappy_50k_256mb
        SELECT * FROM trips_csv;" | hive
$ gsutil ls -l gs://taxi-trips/orc_snappy_50k_256mb/
2377194720  2016-04-28T19:37:03Z  gs://taxi-trips/orc_snappy_50k_256mb/000000_0
2394636259  2016-04-28T19:37:03Z  gs://taxi-trips/orc_snappy_50k_256mb/000001_0
...
1613999811  2016-04-28T19:37:51Z  gs://taxi-trips/orc_snappy_50k_256mb/000056_1
TOTAL: 58 objects, 131358408145 bytes (122.34 GiB)

Benchmarking 64 MB vs 256 MB Index Stripes

$ presto \
    --catalog hive \
    --schema default

Query 1 results: 64 MB benchmarked at 4:07 previously, 256 MB finished in 39 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy_50k_256mb
GROUP BY cab_type;
Query 20160428_193843_00029_mi69s, FINISHED, 10 nodes
Splits: 2,046 total, 2,046 done (100.00%)
0:39 [1.13B rows, 24.1MB] [28.8M rows/s, 626KB/s]

Query 2 results: 64 MB benchmarked at 4:36 previously, 256 MB finished in 33 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy_50k_256mb
GROUP BY passenger_count;
Query 20160428_193949_00030_mi69s, FINISHED, 10 nodes
Splits: 2,046 total, 2,046 done (100.00%)
0:33 [1.13B rows, 3.49GB] [34.5M rows/s, 109MB/s]

Query 3 results: 64 MB benchmarked at 4:12 previously, 256 MB finished in 42 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy_50k_256mb
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_194034_00031_mi69s, FINISHED, 10 nodes
Splits: 2,046 total, 2,046 done (100.00%)
0:42 [1.13B rows, 4.72GB] [26.8M rows/s, 114MB/s]

Query 4 results: 64 MB benchmarked at 4:09 previously, 256 MB finished in 44 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_50k_256mb
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_194733_00034_mi69s, FINISHED, 10 nodes
Splits: 2,046 total, 2,046 done (100.00%)
0:44 [1.13B rows, 8.11GB] [26.1M rows/s, 191MB/s]

The 256 MB stripe size was miles quicker than the 64 MB setting. I now want to see if bigger is better so I'll create a 256 MB vs 512 MB benchmark.

512 MB vs 256 MB Index Stripes

$ hive
CREATE EXTERNAL TABLE trips_orc_snappy_50k_512mb (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 'gs://taxi-trips/orc_snappy_50k_512mb/'
  TBLPROPERTIES ("orc.compress"="SNAPPY",
                 "orc.stripe.size"="536870912",
                 "orc.row.index.stride"="50000");

The following import took an hour and 25 minutes which resulted in 122 GB of capacity being used up on GCS.

$ echo "INSERT INTO TABLE trips_orc_snappy_50k_512mb
        SELECT * FROM trips_csv;" | hive
$ gsutil ls -l gs://taxi-trips/orc_snappy_50k_512mb/
2378631040  2016-04-28T21:14:41Z  gs://taxi-trips/orc_snappy_50k_512mb/000000_0
2395856741  2016-04-28T21:14:42Z  gs://taxi-trips/orc_snappy_50k_512mb/000001_0
...
1615134007  2016-04-28T21:15:29Z  gs://taxi-trips/orc_snappy_50k_512mb/000056_0
TOTAL: 58 objects, 131615705708 bytes (122.58 GiB)

Benchmarking 512 MB vs 256 MB Index Stripes

$ presto \
    --catalog hive \
    --schema default

Query 1 results: 256 MB benchmarked at 39 seconds previously, 512 MB finished in 21 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy_50k_512mb
GROUP BY cab_type;
Query 20160428_211857_00041_mi69s, FINISHED, 10 nodes
Splits: 2,048 total, 2,048 done (100.00%)
0:21 [1.13B rows, 19.1MB] [53.3M rows/s, 920KB/s]

Query 2 results: 256 MB benchmarked at 33 seconds previously, 512 MB finished in 24 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy_50k_512mb
GROUP BY passenger_count;
Query 20160428_212007_00043_mi69s, FINISHED, 10 nodes
Splits: 2,048 total, 2,048 done (100.00%)
0:24 [1.13B rows, 3.48GB] [47.2M rows/s, 148MB/s]

Query 3 results: 256 MB benchmarked at 42 seconds previously, 512 MB finished in 39 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy_50k_512mb
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_212043_00044_mi69s, FINISHED, 10 nodes
Splits: 2,048 total, 2,048 done (100.00%)
0:39 [1.13B rows, 4.73GB] [28.9M rows/s, 123MB/s]

Query 4 results: 256 MB benchmarked at 44 seconds previously, 512 MB also finished in 44 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_50k_512mb
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_212317_00047_mi69s, FINISHED, 10 nodes
Splits: 2,048 total, 2,048 done (100.00%)
0:44 [1.13B rows, 8.12GB] [25.6M rows/s, 188MB/s]

The speed improvement isn't very consistent but nonetheless the 512 MB index size seems to come out ahead.

HDFS vs Google Cloud Storage (GCS)

Next I want to look at the speed differences between storing the data on HDFS, which should have better data locality versus storing the data on GCS, which is elastic in terms of storage space and doesn't require management at the expense of the data being further away from the compute nodes.

I'll copy the fastest dataset so far, the Snappy-compressed, 50K index stride, 512 MB stripe size files to HDFS. This operation completed in a few minutes.

$ hadoop distcp gs://taxi-trips/orc_snappy_50k_512mb/ /

I'll then create a table pointing to that data on HDFS.

$ hive
CREATE EXTERNAL TABLE trips_orc_snappy_50k_512mb_hdfs (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/orc_snappy_50k_512mb/';
$ presto \
    --catalog hive \
    --schema default

Query 1 results: GCS benchmarked at 21 seconds previously, HDFS finished in 11 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy_50k_512mb_hdfs
GROUP BY cab_type;
Query 20160428_213300_00054_mi69s, FINISHED, 10 nodes
Splits: 2,053 total, 2,053 done (100.00%)
0:11 [1.13B rows, 19MB] [105M rows/s, 1.76MB/s]

Query 2 results: GCS benchmarked at 24 seconds previously, HDFS finished in 10 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy_50k_512mb_hdfs
GROUP BY passenger_count;
Query 20160428_213337_00056_mi69s, FINISHED, 10 nodes
Splits: 2,053 total, 2,053 done (100.00%)
0:10 [1.13B rows, 3.48GB] [111M rows/s, 349MB/s]

Query 3 results: GCS benchmarked at 39 seconds previously, HDFS finished in 21 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy_50k_512mb_hdfs
GROUP BY passenger_count,
         year(pickup_datetime);
Query 20160428_213359_00057_mi69s, FINISHED, 10 nodes
Splits: 2,053 total, 2,053 done (100.00%)
0:21 [1.13B rows, 4.73GB] [54.1M rows/s, 231MB/s]

Query 4 results: GCS benchmarked at 44 seconds previously, HDFS finished in 31 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_50k_512mb
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_50k_512mb_hdfs
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Query 20160428_213431_00058_mi69s, FINISHED, 10 nodes
Splits: 2,053 total, 2,053 done (100.00%)
0:31 [1.13B rows, 8.12GB] [36.6M rows/s, 269MB/s]

HDFS is the clear winner in terms of query performance.

There Should Be More To Come

I'm really happy that I was able to take queries lasting 5 minutes and 34 seconds down to 10 seconds, a 33x improvement. There are a number of other optimisations that I've been looking at over the past few weeks that I didn't have time to benchmark for this blog. To add to that, both Presto as a query engine and Dataproc as a platform continue to evolve so I suspect I might be able to squeeze out further performance improvements down the line.





About List