Scala, Couchbase, Spark and Akka-http: A combinatory tutorial for starters

Datetime:2016-08-23 04:18:59          Topic:          Share

Couchbase and Apache Spark are best so far , for the in-memory computation . I am using akka-http because its new in the business. If you are not a big fan of akka-http and don’t think it is yet ready for production then you can take a look on this blog, which displays how to do the same task using Spray .

If you are new to all these technologies and all these sounds just like some weird names:wink: do not worry we will walk through step by step and at the end you will be able to make a REST Api that can be deployed on Spark Cluster with Couchbase as the database.

So first things first :

What is Couchbase ?

Couchbase is one of the best in-memory database with lots of capabilities and a user friendly UI to manage the database. It  is a NoSQL document database with a distributed architecture for performance, scalability, and availability. It is available in both the enterprise as well as community edition.  If you do not have a Couchbase installation and want to get started refer this link

What is Spark ?

Apache Spark™ is a fast and general engine for large-scale data processing. Its mainly built on scala with RDD as its fundamental bit. It provides API in scala , python and R .

What is Akka-http ?

Akka HTTP is made for building integration layers based on HTTP and as such tries to “stay on the sidelines”. Therefore you normally don’t build your application “on top of” Akka HTTP, but you build your application on top of whatever makes sense and use Akka HTTP merely for the HTTP integration needs. For more information about what it is you can take a look here .

How to connect these two ?

For bringing the couchbase to the Spark world we will use the couchbase-spark connector by Couchbase itself.

Pre-requisites:

Now here comes the nice and easy code, but before that I presume now you have a Couchbase 4.5 installation and Spark 1.6 installation. If not refer the links. If you do not want to deploy this application on a cluster and just want to use it on you local machine you do not need a spark installation so you can skip it.

Code:

If you directly want to jump to the code, here it is.

This repository has a sample guide of how to build a Spark-akka-http application with couchbase as a backend, and has a good understandable README file that explains it all.

So your build.sbt should look like this as it will be responsible of which version of  akka-http ,couchbase-spark connector and apache spark so please pay attention while specifying the version.

build.sbt:

name := "spark-couchbase-akka-http-starter-kit"

version := "1.0"

scalaVersion := "2.11.7"

organization := "com.knoldus"

val akkaV = "2.4.5"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "1.6.1",
  "com.typesafe.akka" %% "akka-http-core" % akkaV,
  "com.typesafe.akka" %% "akka-http-experimental" % akkaV,
  "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
  "org.scalatest"     %% "scalatest" % "2.2.6" % "test",
  "com.couchbase.client" %% "spark-connector" % "1.1.0"
)

assembleArtifact in packageScala := false // We don't need the Scala library, Spark already includes it

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
  case "reference.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
fork in run := true

Factories:

This is the part where we interact with the database with the help of spark connector.This code shows how to do CRUD operations and provide factories for the Couchbase using the connector.

package com.knoldus.couchbaseServices.factories

import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._
import com.typesafe.config.ConfigFactory
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

/**
* Created by shivansh on 9/5/16.
*/
trait DatabaseAccess {

val config = ConfigFactory.load("application.conf")
val couchbaseUrl = config.getString("couchbase.url")
val bucketName = config.getString("couchbase.bucketName")

val bucketPassword = config.getString("couchbase.bucketPassword")

val sparkConf: SparkConf = new SparkConf().setAppName("spark-akka-http-couchbase-starter-kit").setMaster("local")
.set("com.couchbase.nodes", couchbaseUrl).set(s"com.couchbase.bucket.$bucketName", bucketPassword)
val sc = new SparkContext(sparkConf)
val NIQLQUERY = s"SELECT * FROM `$bucketName` WHERE name LIKE"
val VIEWNAME = "emailtoName"
val DDOCNAME = "userddoc"

def getNIQLDeleteQuery(documentId: String) =s"""DELETE FROM $bucketName p USE KEYS "$documentId" RETURNING p"""

def persistOrUpdate(documentId: String, jsonObject: JsonObject): Boolean = {
val jsonDocument = JsonDocument.create(documentId, jsonObject)
val savedData = sc.parallelize(Seq(jsonDocument))
Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
}

def getViaN1Ql(name: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(NIQLQUERY + s"'$name%'")).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}

def getViaView(name: String): Option[Array[String]] = {
val viewRDDData = Try(sc.couchbaseView(ViewQuery.from(DDOCNAME, VIEWNAME).startKey(name)).collect()).toOption
viewRDDData.map(_.map(a => a.value.toString))
}

def getViaKV(listOfDocumentIds: String): Option[Array[String]] = {
val idAsRDD = sc.parallelize(listOfDocumentIds.split(","))
Try(idAsRDD.couchbaseGet[JsonDocument]().map(_.content.toString).collect).toOption
}

def deleteViaId(documentID: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(getNIQLDeleteQuery(documentID))).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}
}

object DatabaseAccess extends DatabaseAccess

Routes:

Now we want to develop routes to provide the rest end points to the user so that they can perform CRUD using the REST end points.

package com.knoldus.couchbaseServices.routes

import java.util.UUID

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.ActorMaterializer
import com.couchbase.client.java.document.json.JsonObject
import com.knoldus.couchbaseServices.factories.DatabaseAccess


trait SparkService extends DatabaseAccess {

implicit val system:ActorSystem
implicit val materializer:ActorMaterializer
val logger = Logging(system, getClass)

implicit def myExceptionHandler =
ExceptionHandler {
case e: ArithmeticException =>
extractUri { uri =>
complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
}
}

val sparkRoutes: Route = {
get {
path("insert" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
complete {
val documentId = "user::" + UUID.randomUUID().toString
try {
val jsonObject = JsonObject.create().put("name", name).put("email", email)
val isPersisted = persistOrUpdate(documentId, jsonObject)
isPersisted match {
case true => HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
case false => HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
}
}
} ~ path("updateViaKV" / "name" / Segment / "email" / Segment / "id" / Segment) { (name: String, email: String, id: String) =>
get {
complete {
try {
val documentId = id
val jsonObject = JsonObject.create().put("name", name).put("email", email)
val isPersisted = persistOrUpdate(documentId, jsonObject)
isPersisted match {
case true => HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
case false => HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $id")
}
}
}
} ~ path("getViaKV" / "id" / Segment) { (listOfIds: String) =>
get {
complete {
try {
val idAsRDD: Option[Array[String]] = getViaKV(listOfIds)
idAsRDD match {
case Some(data) => HttpResponse(StatusCodes.OK, entity = data.mkString(","))
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")
}
}
}
} ~
path("getViaView" / "name" / Segment) { (name: String) =>
get {
complete {
val emailFetched: Option[Array[String]] = getViaView(name)
emailFetched match {
case Some(data) => HttpResponse(StatusCodes.OK, entity = data.mkString(","))
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
}
}
}
} ~
path("getViaN1Ql" / "name" / Segment) { (name: String) =>
get {
complete {
val emailFetched = getViaN1Ql(name)
emailFetched match {
case Some(data) => HttpResponse(StatusCodes.OK, entity = data.mkString(","))
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
}
}
}
} ~ path("delete" / "id" / Segment) { (id: String) =>
get {
complete {
try {
val idAsRDD: Option[Array[String]] = deleteViaId(id)
idAsRDD match {
case Some(data) => HttpResponse(StatusCodes.OK, entity = data.mkString(",") + "is deleted")
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $id")
}
}
}
}
}
}

Server:

This is how we can establish the Http server in akka-http

def startServer(address: String, port: Int) = {
Http().bindAndHandle(sparkRoutes, address, port)

The methods saveToCouchbase(), couchbaseGet(), couchbaseView(),couchbaseQuery()  are provided by couchbase so that we can perform the functionality on RDD’s . This is a basic implementation of how to perform CRUD operation on couchbase using the Spark.

In further blogs we will be discussing of how to use the SQL queries provided by spark to query the Couchbase and how to use the Apache Spark 2.0.0 the latest release with the Couchbase.

So stay tuned till then Happy hAKKAing !

Code Repository : Spark-Akka Couhcabse guides 

References: 

  1.   Akka-http Documentation
  2.   Spark Documentation
  3.   Couchbase Installation
  4.   Couchbase-spark connector
  5.   Spark with Couchbase to Electrify Your Data Processing: Couchbase Connect 2015  by  Michael Nitschinger
  6. Using Spark, Spray and Couchbase