How to Speed Up Ad-hoc Analytics with SparkSQL, Parquet, and Alluxio

Datetime:2016-08-23 04:18:04          Topic:          Share

In the big data enterprise ecosystem, there are always new choices when it comes to analytics and data science. Apache incubates so many projects that people are always confused as to how to go about choosing an appropriate ecosystem project. In the data science pipeline, ad-hoc query is an important aspect, which gives users the ability to run different queries that will lead to exploratory statistics that will help them understand their data. In reality, for many company and practices, Hive is still their working horse. As ancient as Hive is, different groups might hack it in a different way to make it handy to use; still, I have heard lots of complaints that the query is never able to finish. Spending time waiting on query execution and adjusting query results can slow down the pace of data science discovery.

Personally, I like using Spark to run ad-hoc queries compared to the Hive map-reduce program, mostly due to the ease of doing other things on Spark at the same time. I don’t have to switch back and forth with different tools. Recently, I also looked into Alluxio, which is a distributed in-memory file system. In this article, I will demonstrate examples that use SparkSQL, Parquet and Alluxio to speed up ad-hoc query analytics. When using Spark to accelerate queries, data locality is the key.

Install Alluxio with MapR

At first, we begin with an existing MapR 5.1 system running on a 3-node AWS instance(m4.2xlarge). We download Alluxio from Github and compile with Mapr5.1 artifacts.

git clone git://github.com/alluxio/alluxio.git
cd alluxio
git checkout v1.2.0
mvn clean package -Dhadoop.version=2.7.0-mapr-1602 -Pspark -DskipTests

Oracle Java 8 is used to compile Alluxio, and it is also the same version of Java that the MapR system is running on. However, to launch the Alluxio web UI, it is required to switch back to Java 7 temporarily. We also make a few changes to the configuration, adding alluxio-env.sh:

ALLUXIO_MASTER_HOSTNAME=${ALLUXIO_MASTER_HOSTNAME:-"node1 host name"}
ALLUXIO_WORKER_MEMORY_SIZE=${ALLUXIO_WORKER_MEMORY_SIZE:-"5120MB"}
ALLUXIO_RAM_FOLDER=${ALLUXIO_RAM_FOLDER:-"/mnt/ramdisk"}
ALLUXIO_UNDERFS_ADDRESS=${ALLUXIO_UNDERFS_ADDRESS:-	"/mapr/clustername/tmp/underFSStorage"}
ALLUXIO_JAVA_OPTS+=" -	Dalluxio.master.journal.folder=/mapr/clustername/tmp/journal"

Those configurations will be put under the file storage of Alluxio on the MapR File System as well as the master journal, while also setting 5GB of memory for Alluxio working set files. We can even set up a dedicated volume in MapR-FS to serve as the under file system for Alluxio. We can also add a worker file with the hostname of the 3 nodes on which we planned to have Alluxio workers running on.

node1
node2
node3

Therefore, on top of our 3-node MapR cluster, we have an Alluxio architecture with the master running on node1, and workers running on node1, node2, and node3. You just need to run a few commands to get Alluxio running; then you will be able to reach the web UI at node1:19999

clush -ac /opt/mapr/alluxio/conf
cd /opt/mapr/alluxio/ 
bin/alluxio format
bin/alluxio-start.sh all

Prepare the Data

For comparison purposes, we also build a 4-node Cloudera cluster (m4.2xlarge) with CDH-5.8.0, and put Alluxio on its 3 data nodes with the same architecture. We run a standalone Spark shell on both clusters, with spark-master on node1, and 3 workers each with 10GB memory on nodes[1-3]. We will use click-through rate prediction data from Kaggle as the sample data that we will work on. The size of the sample data is 5.9GB, which contains over 40 million rows. To launch the Spark shell, we use:

spark-shell --master spark://node1:7077 --executor-memory 2G --packages 	com.databricks:spark-csv_2.1:0:1.4.0

In the Spark shell, we load the csv from maprfs and on hdfs in their respected path:

val trainSchema = StructType(Array(
    StructField("id", StringType, false),
    StructField("click", IntegerType, true),
    StructField("hour", IntegerType, true),
    StructField("C1", IntegerType, true),
    StructField("banner_pos", IntegerType, true),
    StructField("site_id", StringType, true),
    StructField("site_domain", StringType, true),
    StructField("site_category", StringType, true),
    StructField("app_id", StringType, true),
    StructField("app_domain", StringType, true),
    StructField("app_category", StringType, true),
    StructField("device_id", StringType, true),
    StructField("device_ip", StringType, true),
    StructField("device_model", StringType, true),
    StructField("device_type", IntegerType, true),
    StructField("device_conn_type", IntegerType, true),
    StructField("C14", IntegerType, true),
    StructField("C15", IntegerType, true),
    StructField("C16", IntegerType, true),
    StructField("C17", IntegerType, true),
    StructField("C18", IntegerType, true),
    StructField("C19", IntegerType, true),
    StructField("C20", IntegerType, true),
    StructField("C21", IntegerType, true)
))

val train = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .schema(trainSchema)
    .load(trainPath)

Then we write the file three times to generate the data we need: 1) write to Alluxio with the csv format, 2) write to Alluxio with the Parquet format, and 3) write to HDFS/MapR-FS with the Parquet format, since the CSV format is already there on HDFS/MapR-FS.

train.write.parquet("maprfs:///tmp/train_parquet")
train.write.parquet("alluxio://node1:19998/train_parquet")
train.write
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("alluxio://node:19998/train_crt")

When we take a look the file size, we can see that the Parquet file is more efficient in size. 5.9GB of CSV data is compressed to less than 1GB.

Run SparkSQL on Hot Data

Now it’s time to read the data and monitor the different performances. I will show how Parquet can increase query performance, and when it is useful to use Alluxio. Before we read any files, we will remove the OS cache in order to obtain a more accurate measurement.

clush -a "sudo sh -c 'free && sync && echo 3 > /proc/sys/vm/drop_caches && free'"
CSV FILES PARQUET FILES
Cloudera textFile/csv reader Parquet reader
MapR textFile/csv reader Parquet reader

We can capture the time of execution through the Spark UI, but we can also write a small Scala snippet to do that:

val start_time=System.nanoTime()
train.count \\or some other operations
val end_time = System.nanoTime()
println("Time elapsed: " + (end_time-start_time)/1000000 + " milliseconds")

First, we read CSV data into RDD with textFile() in Spark and do a simple count on the CSV file. One strange thing you might notice is that the cached RDD turns out to be slower. I want to emphasize that because RDD is not compressed much when cached into Spark. For example, a DataFrame/dataset is compressed much more efficiently in Spark. Hence with our limited assigned memory, we actually can only cache 15% of the data, which is just a fraction of the whole. So when trying to run query on cached Spark RDD, we want to make sure to assign enough memory to the executor.

Cloudera MapR
TextFile
TextFile reading Alluxio
TextFile cached

Secondly, we use Databricks’ package to read CSV into a DataFrame in Spark and do a simple count on the csv file. Here, we notice much better compression and a huge lift when caching the Spark DataFrame into memory.

Cloudera MapR
CSV File
CSV File reading Alluxio
CSV File cached

Lastly, we read Parquet into the DataFrame in Spark, and do a simple count on the Parquet file. We can observe that Parquet is very efficient for columnar types of queries, due its great design. Plus, it works very well with Apache Drill.

Cloudera MapR
Parquet
Parquet reading Alluxio
Parquet cached

We can observe that utilizing a cached DataFrame and RDD can greatly speed up the query. If we look into how the task is executed, we will notice for cached tasks, all the locality levels of the tasks showed “PROCESS_LOCAL” while for non-cached tasks, they showed “NODE_LOCAL”. This is why I would say data locality is key on query speed here, and why Alluxio would be successful if you have many remote data centers. But you can achieve a similar idea with MapR technology; just create a dedicated volume mirror to some volume with hot data, and place it on local clusters.

Summary

To summarize, if we want to accelerate the query speed on Hadoop, we should really use the cached SparkSQL, and try to use the Parquet format for the right use case. Alluxio is great if you have remote data centers or a heterogeneous storage layer; it can provide the data locality required for Spark execution. And the benefits are resilience against job failure and sharing between multiple Spark sessions. To truly monitor the system performance, we should monitor file system throughput stats. This is just a rough representation of the performance metrics. We also observe that the larger the data underneath is, the more benefits we can gain using Alluxio or by caching them in memory.

Also, if you are interested in using Drill to query Alluxio, just put the compiled alluxio jar file alluxio-core-client-1.2.0-jar-with-dependencies.jar under jars/classb. You also need to add the following lines to conf/core-site.xml.

<property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem<value>
</property>

Have fun querying the data! If you have any questions, please ask them in the comments section below.