Zeppelin/Spark/Cassandra integration tutorial

Datetime:2016-08-23 00:40:35          Topic: Spark  Cassandra           Share

In this post, I’ll cover in detail all the steps necessary to integrate Apache Zeppelin , Apache Spark and Apache Cassandra .

If you are not familiar with Zeppelin, I recommend reading my introduction slides here

The integration between Spark and Cassandra is achieved using the Spark-Cassandra connector .

Natively Zeppelin does support Spark out of the box. But making Zeppelin supporting the Spark-Cassandra integration requires some extra work.

Zeppelin – Spark workflow

With Zeppelin, any interpreter is executed in a separated JVM and it does apply to the Spark interpreter too.

The interpreter is first launched in the class RemoteInterpreterProcess :

public int reference(InterpreterGroup interpreterGroup) {
        ...
        if (!isInterpreterAlreadyExecuting) {
          try {
            port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
          } catch (IOException e1) {
            throw new InterpreterException(e1);
          }
          CommandLine cmdLine = CommandLine.parse(interpreterRunner);
          cmdLine.addArgument("-d", false);
          cmdLine.addArgument(interpreterDir, false);
          cmdLine.addArgument("-p", false);
          cmdLine.addArgument(Integer.toString(port), false);
          cmdLine.addArgument("-l", false);
          cmdLine.addArgument(localRepoDir, false);

          executor = new DefaultExecutor();

          watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
          executor.setWatchdog(watchdog);

          running = true;
          ...

Indeed each interpreter is bootstrapped using the interpreterRunner which is the shell script $ZEPPELIN_HOME/bin/interpreter.sh

Depending on the interpreter type and run mode, the execution is launched with a different set of environment. Extract of the $ZEPPELIN_HOME/bin/interpreter.sh script:

if [[ -n "${SPARK_SUBMIT}" ]]; then
    ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
else
    ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi

There is a small detail here that is critical for the integration of the Spark-Cassandra connector, which is the classpath used to launch the interpreter process. The idea is to include the Spark-Cassandra connector dependencies in this classpath so that Zeppelin can access Cassandra using Spark

Configuration matrix

There are many parameters and configurations to run Zeppelin with Spark and Cassandra :

  1. Standard Zeppelin binaries
  2. Custom Zeppelin build with the Spark-Cassandra connector
  3. Zeppelin connecting to the local Spark runner
  4. Zeppelin connecting to a stand-alone Spark cluster
  5. Using Zeppelin with OSS Spark
  6. Using Zeppelin with DSE ( Datastax Enterprise )

Standard Zeppelin build with local Spark

If you are using the default Zeppelin binaries (downloaded from the official repo), to make the Spark-Cassandra integration work, you would have to

  1. In the Interpreter menu, add the property spark.cassandra.connection.host to the Spark interpreter. The value should point to a single or a list of IP addresses of your Cassandra cluster

    spark_cassandra_connection_host

  2. last but not least, you’d have to add also the Spark-Cassandra connector as dependency to the interpreter

    spark_cassandra_dependencies

    when adding the dependency and the property, do not forget to click on the + icon to force Zeppelin to add your change otherwise it will be lost

What happens at runtime is Zeppelin will download the declared dependencie(s) and all its transitive dependencie(s) from Maven central and/or from your local Maven repository (if any).

Those dependencies will then be stored inside the local repository folder defined by the property: zeppelin.dep.localrepo .

Also, if you go back to the interpreter configuration menu (after a successful run), you’ll see a new property added by Zeppelin: zeppelin.interpreter.localRepo

interpreter_repo

The last string in the folder ( 2BTPVTBVH in the example) is the id of the interpreter instance. All transitive dependencies are downloaded and stored as jar files inside $ZEPPELIN_HOME/local-repo/<INTERPRETER_ID> and their content (.class files) is extracted into $ZEPPELIN_HOME/local-repo

If your Zeppelin server is behindcorporate firewall, the download will fail so Spark won’t be able to connect to Cassandra (you’ll get a ClassNotFoundException in the Spark interpreter logs).

The solution in this case is:

  1. either download manually all the dependencies and put them into the folder zeppelin.dep.localrepo
  2. or build Zeppelin with the Spark-Cassandra connector integrated (see right after)

Custom Zeppelin build with local Spark

You’ll need to build Zeppelin yourself, using one of the available Maven profiles cassandra-spark-1.x to get the correct Spark version.

Those profiles are defined in the $ZEPPELIN_HOME/spark-dependencies/pom.xml file.

For each cassandra-spark-1.x , you can override the defined Spark version using the -Dspark.version=x.y.z flag for the build. To change the Spark-Cassandra connector version, you’ll need to edit the $ZEPPELIN_HOME/spark-dependencies/pom.xml file yourself. Similarly if you want to use the latest version of the Spark-Cassandra connector and a profile does not exist, just edit the file and add your own profile.

In a nutshell, the build command is

>mvn clean package -Pbuild-distr -Pcassandra-spark-1.x -DskipTests

or with the Spark version manually forced:

>mvn clean package -Pbuild-distr -Pcassandra-spark-1.x -Dspark.version=x.y.z -DskipTests

This will force Zeppelin to add all transitive dependencies for the Spark-Cassandra connector into the big/fat jar file located in $ZEPPELIN_HOME/interpreter/spark/dep/zeppelin-spark-dependencies-<ZEPPELIN_VERSION>.jar

One easy way to verify that the Spark-Cassandra connector has been correctly embedded into this file is to copy it somewhere and extract its content to check using the command jar -xvf zeppelin-spark-dependencies-<ZEPPELIN_VERSION>.jar

Once built, you can use this special version of Zeppelin without declaring any dependency to the Spark-Cassandra connector. You still have to set the spark.cassandra.connection.host property on the Spark interpreter

Zeppelin connecting to a stand-alone OSS Spark cluster

Until now, we have supposed that you are using the local Spark mode of Zeppelin ( master = local[*] ). In this section, we want Zeppelin to connect to an existing stand-alone Spark cluster (Spark running on Yarn and Mesos is not covered here because it is recommended to run Spark in stand-alone mode with Cassandra to benefit from data-locality ).

First, you’ll need to set the Spark master property for the Spark interpreter. Instead of local[*] , put a real address like spark://x.y.z:7077 .

spark_master_url

The extract of the shell script from the first section showed that Zeppelin will invoke the spark-submit command, passing its own Spark jar with all the transitive dependencies using the parameter --driver-class-path .

But where does Zeppelin fetches all the dependencies jar ? From the local repository seen earlier !!!

As a consequence, if you add the Spark-Cassandra connector as dependency (standard Zeppelin build) and you run against a stand-alone Spark cluster, it will fail because the local repository will be empty!!!. Run first a simple Spark job in local Spark mode to let Zeppelin a chance to download the dependencies before switching to the stand-alone Spark

But it’s not sufficient, on your stand-alone Spark cluster, you must also add the Spark-Cassandra connector dependencies into Spark classpath so that the workers can connect to Cassandra.

How do to that ?

  1. edit $SPARK_HOME/conf/spark-env.sh file and add the Spark-Cassandra dependencies to the SPARK_CLASSPATH variable.

    spark_classpath

    As you can see, it’s not just the simple spark-cassandra connector jar we need but the assembly jar e.g. the fat jar which includes all transitive dependencies .

    To get this jar, you’ll have to build it yourself:

    • git clone https://github.com/datastax/spark-cassandra-connector/
    • sbt assembly
  2. another alternative is to execute the spark-submit command with the --package com.datastax.spark:spark-cassandra-connector_2.10:<connector_version> flag. In this case, Spark is clever enough to fetch all the transitive dependencies for you from a remote repository.

    The same warning about corporate firewall applies here.

    How would you add this extra --package flag to Zeppelin spark-submit ? By exporting the SPARK_SUBMIT_OPTIONS environment variable in $ZEPPELIN_HOME/conf/zeppelin-env.sh

    spark_submit_options

The solution of using the --package flag seems easy but not suitable for a recurrent Spark job because it will force Spark to download all the dependencies.

If your Spark job is not a one-shot job, I would recommend building the assembly jar for the Spark-Cassandra connector and set it in the SPARK_CLASSPATH variable so that is it available for all of your Spark jobs.

I have pre-built some assembly jars (using Scala 2.10 ) you can download here

Zeppelin connecting to a stand-alone Datastax Enterprise cluster

Instead of using an open-source Spark, using Datastax Enterprise (DSE) makes your life easier because all the dependencies of the Spark-Cassandra connector are included by default in the build of Spark. So there is neither SPARK_CLASSPATH variable to set nor --package flag to manage on Zeppelin side.

But you’ll still need to either declare the Spark-Cassandra connector dependency on Zeppelin side or build Zeppelin with the connector embedded.

Pay attention if you want to build Zeppelin for DSE because each version of DSE 4.8.x is using a custom Spark version and Hadoop 1 dependencies. From DSE 5.0.x and afterward, you should start the server in Hadoop 2 mode ( `dse -x 2 cassandra -k` ) to make it work

Zeppelin custom builds

To make your life easier, I have created a list of custom Zeppelin builds for each version of OSS Spark/DSE. All the zeppelin custom builds are located in the shared Google drive folder .

The custom Maven pom file spark-dependencies-pon.xml used for building those versions is provided as a reference

Zeppelin version Spark version/DSE version Spark-Cassandra connector version Tarball
0.6.0 Spark 1.4.0 1.4.4 zeppelin-0.6.0-cassandra-spark-1.4.0.tar.gz
0.6.0 Spark 1.4.1 1.4.4 zeppelin-0.6.0-cassandra-spark-1.4.1.tar.gz
0.6.0 Spark 1.5.0 1.5.1 zeppelin-0.6.0-cassandra-spark-1.5.0.tar.gz
0.6.0 Spark 1.5.1 1.5.1 zeppelin-0.6.0-cassandra-spark-1.5.1.tar.gz
0.6.0 Spark 1.5.2 1.5.1 zeppelin-0.6.0-cassandra-spark-1.5.2.tar.gz
0.6.0 Spark 1.6.0 1.6.0 zeppelin-0.6.0-cassandra-spark-1.6.0.tar.gz
0.6.0 Spark 1.6.1 1.6.0 zeppelin-0.6.0-cassandra-spark-1.6.1.tar.gz
0.6.0 Spark 1.6.2 1.6.0 zeppelin-0.6.0-cassandra-spark-1.6.2.tar.gz
0.6.0 DSE 4.8.3, DSE 4.8.4 (Spark 1.4.1) 1.4.1 zeppelin-0.6.0-dse-4.8.3-4.8.4.tar.gz
0.6.0 DSE 4.8.5, DSE 4.8.6 (Spark 1.4.1) 1.4.2 zeppelin-0.6.0-dse-4.8.5-4.8.6.tar.gz
0.6.0 DSE 4.8.7 (Spark 1.4.1) 1.4.3 zeppelin-0.6.0-dse-4.8.7.tar.gz
0.6.0 DSE 4.8.8, DSE 4.8.9 (Spark 1.4.1) 1.4.4 zeppelin-0.6.0-dse-4.8.8-4.8.9.tar.gz
0.6.0 DSE 5.0.0, DSE 5.0.1 (Spark 1.6.1) 1.6.0 zeppelin-0.6.0-dse-5.0.0-5.0.1.tar.gz

Please note that for Datastax Enterprise 5.0.0 and Datastax Enterprise 5.0.1 , you need to start the analytics mode with `dse -x 2 cassandra -k` to use Hadoop2 mode otherwise the custom build, which has Spark 1.6.x dependencies, will not work





About List