Graph Analytics on HBase with HGraphDB and Spark GraphFrames

Datetime:2017-04-03 05:28:00         Topic: HBase  Spark          Share        Original >>
Here to See The Original Article!!!

In a previouspost, I showed how to analyze graphs stored in  HGraphDB using  Apache Giraph .  Giraph depends on Hadoop, and some developers may be using Spark instead.  In this blog I will show how to analyze HGraphDB graphs using  Apache Spark GraphFrames .

In order to prepare data stored in HGraphDB for GraphFrames, we need to import vertex and edge data from HGraphDB into Spark DataFrames.  Hortonworks provides a Spark-on-HBase Connector to do just that.  The Spark-on-HBase Connector allows for custom serde (serializer/deserializer) types to be created by implementing the SHCDataType trait.  The serde for HGraphDB is available here .  (When testing the serde, I ran into some issues with the Spark-on-HBase Connector for which I have  submitted pull requests .  Hopefully those will be merged soon.  In the meantime, you can use my fork of the Spark-on-HBase Connector.)

To demonstrate how to use HGraphDB with GraphFrames, we first use HGraphDB to create the same graph example that is used in the GraphFrames User Guide .

Vertex a = graph.addVertex(T.id, "a", "name", "Alice", "age", 34);
Vertex b = graph.addVertex(T.id, "b", "name", "Bob", "age", 36);
Vertex c = graph.addVertex(T.id, "c", "name", "Charlie", "age", 30);
Vertex d = graph.addVertex(T.id, "d", "name", "David", "age", 29);
Vertex e = graph.addVertex(T.id, "e", "name", "Esther", "age", 32);
Vertex f = graph.addVertex(T.id, "f", "name", "Fanny", "age", 36);
Vertex g = graph.addVertex(T.id, "g", "name", "Gabby", "age", 60);
a.addEdge("friend", b);
b.addEdge("follow", c);
c.addEdge("follow", b);
f.addEdge("follow", c);
e.addEdge("follow", f);
e.addEdge("friend", d);
d.addEdge("friend", a);
a.addEdge("friend", e);

Now that the graph is stored in HGraphDB, we need to specify a schema to be used by the Spark-on-HBase Connector for retrieving vertex and edge data.

def vertexCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"vertices",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"name":{"cf":"f", "col":"name", "type":"string"},
      |"age":{"cf":"f", "col":"age", "type":"int"}
    |}
  |}""".stripMargin

def edgeCatalog = s"""{
    |"table":{"namespace":"testGraph", "name":"edges",
    |  "tableCoder":"org.apache.spark.sql.execution.datasources.hbase.types.HGraphDB", "version":"2.0"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"string"},
      |"relationship":{"cf":"f", "col":"~l", "type":"string"},
      |"src":{"cf":"f", "col":"~f", "type":"string"},
      |"dst":{"cf":"f", "col":"~t", "type":"string"}
    |}
  |}""".stripMargin

Some things to note about this schema:

  • The HGraphDB serde is specified as the tableCoder above.
  • All HGraphDB columns are stored in a column family named f .
  • Vertex and edge labels are stored in a column with qualifier ~l .
  • The source and destination columns have qualifiers ~f and ~t , respectively.
  • All vertex and edge properties are stored in columns with the qualifiers simply being the name of the property.

Now that we have a schema, we can create Spark DataFrames for both the vertices and edges, and then pass these to the GraphFrame constructor.

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()
}
val verticesDataFrame = withCatalog(vertexCatalog)
val edgesDataFrame = withCatalog(edgeCatalog)
val g = GraphFrame(verticesDataFrame, edgesDataFrame)

With the GraphFrame in hand, we now have full access to the Spark GraphFrame APIs . For instance, here are some arbitrary graph operations from the  GraphFrames Quick Start .

// Query: Get in-degree of each vertex.
g.inDegrees.show()

// Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

// Run PageRank algorithm, and show results.
val results = g.pageRank.resetProbability(0.01).maxIter(20).run()
results.vertices.select("id", "pagerank").show()

You can see further graph operations against our example graph (taken from the GraphFrames User Guide ) in this  test .

As you can see, HGraphDB makes graphs stored in HBase easily accessible by Apache TinkerPopApache Giraph , and now  Apache Spark GraphFrames .








New