IoT Analytics Platform

Datetime:2016-08-23 00:41:55          Topic: Cassandra  Apache Kafka           Share

14/07/16 byAchim Nierbeck

No Comments

The Internet of Things a.k.a. the next industrial revolution is the current hype, but what kind of challenges do we face with the consumption of big amounts of data? One variant is to collect all the data and do post processing in batches. However, the preferred way is to do real or near real time analytics of the latest data.

To cope with the sheer amount of data, you’ll need a platform which can scale with the amount of data. The platform and also the software components need to be able to adjust to the changing requirements depending on the changing influx of data.

The SMACK Stack (Spark, Mesos, Akka, Cassandra and Kafka) has proven itself to be a solid base for such a platform. Akka, Spark and Kafka are capable of taking care of the vast amount of data, while Mesos, Marathon and DC/OS scale the platform.

My colleagueFlorian Troßbach already wrote a nicearticle about the SMACK Stack.

This blog and the corresponding sources will focus on SACK (Spark, Akka, Cassandra and Kafka) components of the SMACK Stack. The basis for this showcase is data from the 171 bus routes of Los Angeles which is freely available via a REST API (Metro-API: http://developer.metro.net/).

As can be seen in the video, the current vehicle positions are drawn on top of a OpenStreetMap in real time, previously collected data can be queried from Cassandra and also be drawn on the map. The collected vehicle data can be used for calculation of hotspots where a lot of vehicles meet.

Architecture

  • Ingest – Akka
  • Digest – Spark
  • UI – Javascript, Openstreetmap
  • Backend – Akka
  • The ingesting service initially collects the metadata from the Metro-API to store those directly in Cassandra. Depending on this collected metadata, the route details and vehicle information is queried from the Metro-API. Every 30 seconds the vehicle information is collected and published to Kafka.

    The Spark digesting application reads the incoming vehicle information from Kafka, enhances those for more efficient retrieval by the frontend,stores the enhanced data in Cassandra publishes it to Kafka as well.

    As it would be rather boring to just collect those data and store it in Cassandra, a frontend is needed. The frontend uses OpenLayers-API to visualize the vehicle positions and details on bus routes with OpenStreetMap. The current positions are streamed directly from Kafka onto the map via websocket communication. Additionally the hotspot clusters of vehicle meeting points can also be drawn onto the map.

    The following sections will go into more detail about the main parts of the showcase.

    Data Ingest

    The data ingest service is an example on how to read data into the IoT-Analytics Platform.But since it only requests the current data every 30 seconds from a publicly available API, it can’t really be compared to the humongous amount of data produced by a real IoT scenario. On the other hand the real-time ingestion of Vehicle Positions every 30 seconds is far more dynamic than other examples such as statically bound weather data.

    The Scala actors framework Akka is used as technological basis for the ingestion. When started it automatically retrieves the meta data, route information and stores those details directly inside Cassandra.

    The route meta data contains IDs for the routes together with the corresponding geo coordinates of the bus stops. For every route there is also a display name available – the names shown on the buses themselves.

    Using this meta data, it is possible to extract all vehicle information periodically for the given bus routes. An extra actor does take care of the periodic extraction. It triggers itself every 30 seconds.

    val tick = context.system.scheduler.schedule(0 seconds, 30 seconds, self, Tick())
     
    override def receive: Receive = {
     case Tick() => {
       log.info(s"extracting vehicles Infor for routeID: ${routeInfo.id}")
       extractVehicles(routeInfo.id)
     }
    }

    Akka uses the concept of “ reactive streams ” to cope with a ginormous amount of data. More about Akka can be found at theblog ofHeiko Seeberger. The following snippet shows how a flow of data is created for every bus route:

    Flow[Vehicle].map(elem => {
     log.info(s"publishing element: ${elem}")
     new ProducerRecord[Array[Byte], Vehicle]("METRO-Vehicles", elem)
    }).to(producer).runWith(Source.actorPublisher(VehiclesActor.props(routeInfo, httpClient)))

    The VehicleActor acts as publishing source which is triggered every 30 seconds. In this simple flow the data is directly streamed to the Kafka producer to store the incoming vehicle information on the Kafka.

    To achieve this the publisher retrieves the data and sends Vehicle objects back to the flow via the onNext method. In case of the flow not being able to deliver this information, an internal buffer will be used to hold the data till the flow is able to process the data again. This is the best practice on how to handle back pressure with Akka.

    vehicles.items.foreach {
     vehicle =>
       {
         log.debug(vehicle.toString)
         log.debug("sending vehicle to stream sink")
         val vehicleToPersist = Vehicle(vehicle.id, Some(currTime.minusSeconds(vehicle.seconds_since_report).withMillisOfSecond(0).toDate), vehicle.latitude, vehicle.longitude, vehicle.heading, Some(routeInfo.id), vehicle.run_id, vehicle.seconds_since_report)
         log.debug(s"sending Vehicle ${vehicleToPersist}")
         if (buffer.isEmpty && totalDemand > 0) {
           log.info(s"Buffer Empty sending vehicle: ${vehicleToPersist}")
           onNext(vehicleToPersist)
         } else {
           log.info(s"Buffering vehicle: ${vehicleToPersist}")
           buffer :+= vehicleToPersist
           if (totalDemand > 0) {
             val (use, keep) = buffer.splitAt(totalDemand.toInt)
             buffer = keep
             log.info(s"Demand is greater 0 sending ${use}")
             use foreach onNext
           }
         }
       }
    }

    Data Digestion

    The data digestion service delivers the actual business value of an IoT analytics platform. In this part of the application, information can be gathered, transformed and optimized. In our example it is necessary to enhance the geo coordinates of the vehicles to enable efficient retrieval from Cassandra. This is a key requirement of using geo coordinates with Cassandra. Partition keys of Cassandra tables can only be queried on equality as those keys are calculated as hash values. Those hash values are used to define on which node the data resides in a cluster. A hash value can’t be used to isolate certain values depending on a range, so using<and > operators on a partition key is not possible. More details on how this exactly works and how QuadKeys will help is handled later in this article.

    Additionally the enhanced data is written back to Kafka again, so it can be streamed directly to the frontend.

    With this simple example, the main purpose of the Spark job is to transform geo coordinates of vehicles (Vehicle) to TiledVehicle objects.

    Those TiledVehicles are needed to have the ability to do a geographical search for vehicles. Every vehicle will have a TileID which defines on which tile the coordinates of this vehicle is located. With this approach it’s possible to do a geographical search for coordinates within a given bounding box.

    val tiledVehicle = vehicle.map(vehicle => TiledVehicle(
     TileCalc.convertLatLongToQuadKey(vehicle.latitude, vehicle.longitude),
     TileCalc.transformTime(vehicle.time.get),
     vehicle.id,
     vehicle.time,
     vehicle.latitude,
     vehicle.longitude,
     vehicle.heading,
     vehicle.route_id,
     vehicle.run_id,
     vehicle.seconds_since_report
    ))
     
    tiledVehicle.cache()
     
    tiledVehicle.saveToCassandra("streaming", "vehicles_by_tileid")
     
    tiledVehicle.foreachRDD(rdd => rdd.foreachPartition(f = tiledVehicles => {
     
     val producer: Producer[String, Array[Byte]] = new KafkaProducer[String, Array[Byte]](producerConf)
     
     tiledVehicles.foreach { tiledVehicle =>
       val message = new ProducerRecord[String, Array[Byte]]("tiledVehicles", new TiledVehicleEncoder().toBytes(tiledVehicle))
       producer.send(message)
     }
     
     producer.close()
    }))

    Grouping of coordinates by a quadkey

    To find geographic points, lines or areas within Cassandra those geographic artifacts need to be identified by a unique identifier. This is a result of the way data structures are stored in Cassandra. Partition keys can only be queried for equality. That would mean if you look for an coordinate within an enclosing bounding box, it’s not possible to have that coordinate as partition key.

    Therefore you somehow need to group certain coordinates to one unique key. To achieve this you need to define a quadkey to represent a certain area which contains a set of coordinates. To achieve this the map is separated into tiles using the same mechanism as used by Microsoft Bing maps. The earth is split into four different quadrants, each consisting of one tile. If you split those tiles again into four tiles , you are able to address any point on the earth by one key, depending on the length of the key.

    An image showing this can be found on the Microsoft Blog here .

    The same can be achieved by using GeoHashes , but those aren’t as human readable as quadkeys and therefore can not be verified as easily.

    In the context of this example application we define the length of the quadkey to be 15 characters long. By with using a tile of 256×256 pixels size, a geographic area of 1.5 to 1.5 km can be spanned and used for grouping coordinates. More details on the Bing maps tile system can be found on the corresponding link .

    Viewing the live data

    In comparison to static data it is much easier to create a dynamic front end showing the dynamic data of busses on a map. With this in mind, let us look at the last part of the chain: the visualization. It has been kept as simple as possible, using OpenLayers v3 in combination with OpenStreetMap. The following data can be visualized on that map:

    • Current positions of vehicles (busses)
    • Positions of vehicles in the past (the last 15 min)
    • Information about vehicles and routes
    • Visualization of bus routes and the waypoints
    • Visualization of clusters of vehicles

    To visualize previous positions of vehicles a REST service has been created with Akka Http. This service extracts the data for a given bounding box from Cassandra. A bounding box is a rectangle which encloses the viewable map portion. To draw current vehicle positions, a websocket service is used to directly stream the data from Kafka into the frontend.

    To show the vehicles of a given bounding box a simple Akka-Http-Route is used:

    def vehiclesOnBBox = path("vehicles" / "boundingBox") {
     corsHandler {
       parameter('bbox.as[String], 'time.as[String] ? "5") { (bbox, time) =>
         get {
           marshal {
             val boundingBox: BoundingBox = toBoundingBox(bbox)
     
             val askedVehicles: Future[Future[List[Vehicle]]] = (vehiclesPerBBox ? (boundingBox, time)).mapTo[Future[List[Vehicle]]]
             askedVehicles.flatMap(future => future)
     
           }
         }
       }
     }
    }

    This route requests the VehiclePerBBox actor to retrieve the vehicle per given bounding box.

    override def receive(): Receive = {
     case (boundingBox: BoundingBox,time: String) => {
       log.info("received a BBox query")
       val eventualVehicles = getVehiclesByBBox(boundingBox, time)
       log.info(s"X: ${eventualVehicles}")
       sender() ! eventualVehicles
     }
     case _ => log.error("Wrong request")
    }

    Analogous to saving the vehicle positions, the quadkeys (TileIDs) are calculated for a given bounding box. A combination of the calculated TileIDs with a timestamp is used to retrieve the corresponding vehicle data within the given bounding box. That data is visualized on the map.

    The same method is used to retrieve bus route meta data and clusters of busses.Live data on the other hand is pushed onto the map a WebSocket connection. It behaves analogously to the way data is retrieved from Cassandra. A bounding box is send to the service. Data is retrieved according to the given bounding box depending on the tile ids, as this enhanced vehicle data is also available from Kafka.

    The Request Handler checks if the given connection is a WebSocket connection. In that case a new Akka flow to retrieve the data is started. This flow connects with another Actor to pipe the data from Kafka onto the web page.

    val requestHandler: HttpRequest => HttpResponse = {
     case req@HttpRequest(GET, Uri.Path("/ws/vehicles"), _, _, _) =>
       req.header[UpgradeToWebSocket] match {
         case Some(upgrade) => upgrade.handleMessages(Flows.graphFlowWithStats(router))
         case None => HttpResponse(400, entity = "Not a valid websocket request!")
       }
     case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
    }

    The Route actor takes care of the communication between a newly created publishing actor, which is bound to the incoming bounding box, and the response channel to the web browser via WebSocket.

    class RouterActor extends Actor with ActorLogging {
     var routees = Set[Routee]()
     
     def receive: Receive = {
       case ar: AddRoutee => {
         log.info(s"add routee ${ar.routee}")
         routees = routees + ar.routee
       }
       case rr: RemoveRoutee => {
         log.info(s"remove routee ${rr.routee}")
         routees = routees - rr.routee
       }
       case msg:Any => {
         routees.foreach(_.send(msg, sender))
       }
     }
    }

    This route actor is available from the beginning of the application while the consuming actor is created on the fly for the newly created flow from websocket to publisher. This route actor is taking care of all new consuming actors and will dispatch incoming data from Kafka to those consuming actors.

    class TiledVehiclesFromKafkaActor(router: ActorRef) extends Actor with ActorLogging {
     
     import scala.concurrent.ExecutionContext.Implicits.global
     implicit val materializer = ActorMaterializer()
     
     
     //Kafka
     val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new TiledVehicleFstDeserializer,
       Set("tiledVehicles"))
       .withBootstrapServers("localhost:9092")
       .withGroupId("group1")
     
     val source = Consumer.atMostOnceSource(consumerSettings.withClientId("Akka-Client"))
     source.map(message => message.value).runForeach(vehicle => router ! vehicle)
     
     override def receive: Actor.Receive = {
       case _ => // just ignore any messages
     }
    }

    If a new request with a bounding box is sent to the router via a websocket connection, the publisher will receive it and will create a new actor instance of it which is registered at the router.

    def receive: Receive = {
     case bbox: BoundingBox => {
       log.info("received BBox changing behavior")
       tileIds = TileCalc.convertBBoxToTileIDs(bbox)
       log.info(s"${tileIds.size} tiles are requested")
       unstashAll()
       become(streamAndQueueVehicles, discardOld = false)
     }
     case msg => stash()
    }

    If a bounding box request comes into the new actor it will change its behaviour and act only on Vehicle data which is within the enclosing bounding box. That data will be send to the frontend via WebSocket.

    def streamAndQueueVehicles: Receive = {
     
     // receive new stats, add them to the queue, and quickly
     // exit.
     case tiledVehicles: TiledVehicle=>
       // remove the oldest one from the queue and add a new one
       if (queue.size == MaxBufferSize) queue.dequeue()
       if (tileIds.contains(tiledVehicles.tileId)) {
         queue += Vehicle(tiledVehicles.id,tiledVehicles.time, tiledVehicles.latitude, tiledVehicles.longitude, tiledVehicles.heading, tiledVehicles.route_id, tiledVehicles.run_id, tiledVehicles.seconds_since_report)
     
         if (!queueUpdated) {
           queueUpdated = true
           self ! QueueUpdated
         }
       }
     // we receive this message if there are new items in the
     // queue. If we have a demand for messages send the requested
     // demand.
     case QueueUpdated => deliver()
     
     // the connected subscriber request n messages, we don't need
     // to explicitely check the amount, we use totalDemand propery for this
     case Request(amount) =>
       deliver()
     
     // subscriber stops, so we stop ourselves.
     case Cancel =>
       context.stop(self)
     
     case stringMsg:String => {
       if ("close" == stringMsg) {
         log.info("closing websocket connection")
         become(receive, discardOld = true)
         router ! Cancel
       }
     }
    }

    If no further data can be send via the WebSocket connection, the actor will kill itself.

    Clustering of vehicle positions

    An analytics platform should not only be used to show moving vehicles, it reveals its true potential by enabling an analysis of the collected data. To demonstrate this a sample Spark application was created to find clusters of vehicle positions. It should be shown on which coordinates vehicles meet. The “Density-Based Clustering in Spatial Databases” (DBSCAN) grouping algorithm is used to find those. The blog of Natalino Busa has been very inspirational for choosing this algorithm.

    First we select all vehicle coordinates for a given time range from Cassandra:

    val vehiclesPos:Array[Double] = vehiclesRdd
     .flatMap(vehicle => Seq[(String, (Double,Double))]((s"${vehicle.id}_${vehicle.latitude}_${vehicle.longitude}",(vehicle.latitude, vehicle.longitude))))
     .reduceByKey((x,y) => x)
     .map(x => List(x._2._1, x._2._2)).flatMap(identity)
     .collect()

    This array is transformed back into an RDD and mapped to a density matrix, where the matrix consists of all coordinates as latitude and longitude.

    val vehiclePosRdd: RDD[Array[Double]] = sc.parallelize(seqOfVehiclePos)
     
    val denseMatrixRdd: RDD[DenseMatrix[Double]] = vehiclePosRdd.map(vehiclePosArray => DenseMatrix.create[Double](vehiclePosArray.length / 2, 2, vehiclePosArray))
    Auf dieser dichte Matrix kann dann wieder die eigentliche Berechnung gemacht werden: 
     
    val clusterRdd: RDD[GDBSCAN.Cluster[Double]] = denseMatrixRdd.map(dm => dbscan(dm)).flatMap(identity)

    The DBSCAN algorithm is used on this density matrix of longitude and latitudes. A cluster exists where there are at least three points within about 50 meter range of each other.

    def dbscan(v : breeze.linalg.DenseMatrix[Double]):Seq[GDBSCAN.Cluster[Double]] = {
     log.info(s"calculating cluster for denseMatrix: ${v.data.head}, ${v.data.tail.head}")
     val gdbscan = new GDBSCAN(
       DBSCAN.getNeighbours(epsilon = 0.0005, distance = Kmeans.euclideanDistance),
       DBSCAN.isCorePoint(minPoints = 3)
     )
     val clusters = gdbscan.cluster(v)
     clusters
    }

    The result needs some cleansing and filtering and will be stored back in Cassandra again, from where it is easy to select via the frontend to be shown on the map.

    Noteworthy

    The ingest and the backend of the UI are completely based on Akka and Scala 2.11. But as it is common to use Spark in the context of SMACK you are usually bound or better off with Scala 2.10 since that is the preferred Scala version for Spark at the time of writing. So the sample application ended up with support for both Scala versions. The showcase uses the sbt-doge SBT plugin, as this enables cross compilation during the build process. It turned out to be especially useful for the commonly used case classes and other helper classes of the commons module, such as the utility classes used for calculation of the tile ids, or the serialization and deserialization of case classes in the context of Kafka.

    As serialization framework Fast-Serialization has been used. It turned out to be very effective with serializing/deserializing of objects used in different versions of Scala.

    While creating the multi module project the goal has been to keep the build as easy and simple as possible. Therefore all project configuration concerning build and projects is located within the root build.sbt file.

    Conclusion

    The analytics platform shown here can not only be used for IoT data, it can be used for a variety of scenarios with lots of incoming data. This is especially true for scenarios with a lot of parallel and continuously incoming data flows. A platform like this is certainly better suited for those scenarios then a classical REST application where the data is stored in an RDBMS.

    Even though having Cassandra with all its benefits as storage, this solution showed a minor issue. With an RDBMS system it is easier to query with ranges on the primary key, which is not supported for a Cassandra. This can easily be worked around with choosing a QuadKey (TileID) and grouping data points within dedicated tiles. As a result of this Cassandra is able to shine.

    The current solution of using QuadKeys for grouping geo coordinates within tiles could be further optimized. The current solution only uses one static zoom level, at this point extra zoom levels with higher aggregation of points can help in boosting performance. To achieve this performance boost an optimized query structure would also be needed. For example the use of hilbert space filling curves to retrieve the optimum amount of tileIds for a given geographical shape could be a significant improvement.

    Links:





    About List