[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book by Rajanarayanan Thottuvaikkatumana titled, Apache Spark 2 for Beginners. The author presents a learners guide for python and scala developers to develop large-scale and distributed data processing applications in the business environment.[/box]
In this post we will see how a Spark user can work with Spark’s most popular graph processing package, GraphFrames. Additionally explore how you can benefit from running queries and finding insightful patterns through graphs.
The Spark GraphX library is the graph processing library that has the least programming language support. Scala is the only programming language supported by the Spark GraphX library. GraphFrames is a new graph processing library available as an external Spark package developed by Databricks, University of California, Berkeley, and Massachusetts Institute of Technology, built on top of Spark DataFrames. Since it is built on top of DataFrames, all the operations that can be done on DataFrames are potentially possible on GraphFrames, with support for programming languages such as Scala, Java, Python, and R with a uniform API. Since GraphFrames is built on top of DataFrames, the persistence of data, support for numerous data sources, and powerful graph queries in Spark SQL are additional benefits users get for free.
Just like the Spark GraphX library, in GraphFrames the data is stored in vertices and edges. The vertices and edges use DataFrames as the data structure. The first use case covered in the beginning of this chapter is used again to elucidate GraphFrames-based graph processing.
Please make a note that GraphFrames is an external Spark package. It has some incompatibility with Spark 2.0. Because of that, the following code snippets will not work with park 2.0. They work with Spark 1.6. Refer to their website to check Spark 2.0 support.
At the Scala REPL prompt of Spark 1.6, try the following statements. Since GraphFrames is an external Spark package, while bringing up the appropriate REPL, the library has to be imported and the following command is used in the terminal prompt to fire up the REPL and make sure that the library is loaded without any error messages:
$ cd $SPARK_1.6__HOME
$ ./bin/spark-shell --packages graphframes:graphframes:0.1.0-spark1.6
Ivy Default Cache set to: /Users/RajT/.ivy2/cache
The jars for the packages stored in: /Users/RajT/.ivy2/jars
:: loading settings :: url = jar:file:/Users/RajT/source-code/sparksource/spark-1.6.1/assembly/target/scala-2.10/spark-assembly-1.6.2-
SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found graphframes#graphframes;0.1.0-spark1.6 in list
:: resolution report :: resolve 153ms :: artifacts dl 2ms
:: modules in use:
graphframes#graphframes;0.1.0-spark1.6 from list in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/5ms)
16/07/31 09:22:11 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala> import org.graphframes._
import org.graphframes._
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> //Create a DataFrame of users containing tuple values with a
mandatory Long and another String type as the property of the vertex
scala> val users = sqlContext.createDataFrame(List((1L, "Thomas"),(2L,
"Krish"),(3L, "Mathew"))).toDF("id", "name")
users: org.apache.spark.sql.DataFrame = [id: bigint, name: string]
scala> //Created a DataFrame for Edge with String type as the property of
the edge
scala> val userRelationships = sqlContext.createDataFrame(List((1L, 2L,
"Follows"),(1L, 2L, "Son"),(2L, 3L, "Follows"))).toDF("src", "dst",
"relationship")
userRelationships: org.apache.spark.sql.DataFrame = [src: bigint, dst:
bigint, relationship: string]
scala> val userGraph = GraphFrame(users, userRelationships)
userGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, name:
string], e:[src: bigint, dst: bigint, relationship: string])
scala> // Vertices in the graph
scala> userGraph.vertices.show()
+---+------+
| id| name|
+---+------+
| 1|Thomas|
| 2| Krish|
| 3|Mathew|
+---+------+
scala> // Edges in the graph
scala> userGraph.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 1| 2| Follows|
| 1| 2| Son|
| 2| 3| Follows|
+---+---+------------+
scala> //Number of edges in the graph
scala> val edgeCount = userGraph.edges.count()
edgeCount: Long = 3
scala> //Number of vertices in the graph
scala> val vertexCount = userGraph.vertices.count()
vertexCount: Long = 3
scala> //Number of edges coming to each of the vertex.
scala> userGraph.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
| 2| 2|
| 3| 1|
+---+--------+
scala> //Number of edges going out of each of the vertex.
scala> userGraph.outDegrees.show()
+---+---------+
| id|outDegree|
+---+---------+
| 1| 2|
| 2| 1|
+---+---------+
scala> //Total number of edges coming in and going out of each vertex.
scala> userGraph.degrees.show()
+---+------+
| id|degree|
+---+------+
| 1| 2|
| 2| 3|
| 3| 1|
+---+------+
scala> //Get the triplets of the graph
scala> userGraph.triplets.show()
+-------------+----------+----------+
| edge| src| dst|
+-------------+----------+----------+
|[1,2,Follows]|[1,Thomas]| [2,Krish]|
| [1,2,Son]|[1,Thomas]| [2,Krish]|
|[2,3,Follows]| [2,Krish]|[3,Mathew]|
+-------------+----------+----------+
scala> //Using the DataFrame API, apply filter and select only the needed
edges
scala> val numFollows = userGraph.edges.filter("relationship =
'Follows'").count()
numFollows: Long = 2
scala> //Create an RDD of users containing tuple values with a mandatory
Long and another String type as the property of the vertex
scala> val usersRDD: RDD[(Long, String)] = sc.parallelize(Array((1L,
"Thomas"), (2L, "Krish"),(3L, "Mathew")))
usersRDD: org.apache.spark.rdd.RDD[(Long, String)] =
ParallelCollectionRDD[54] at parallelize at <console>:35
scala> //Created an RDD of Edge type with String type as the property of
the edge
scala> val userRelationshipsRDD: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, "Follows"), Edge(1L, 2L,
"Son"),Edge(2L, 3L, "Follows")))
userRelationshipsRDD:
org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] =
ParallelCollectionRDD[55] at parallelize at <console>:35
scala> //Create a graph containing the vertex and edge RDDs as created
before
scala> val userGraphXFromRDD = Graph(usersRDD, userRelationshipsRDD)
userGraphXFromRDD: org.apache.spark.graphx.Graph[String,String] =
[email protected]
scala> //Create the GraphFrame based graph from Spark GraphX based graph
scala> val userGraphFrameFromGraphX: GraphFrame =
GraphFrame.fromGraphX(userGraphXFromRDD)
userGraphFrameFromGraphX: org.graphframes.GraphFrame = GraphFrame(v:[id:
bigint, attr: string], e:[src: bigint, dst: bigint, attr: string])
scala> userGraphFrameFromGraphX.triplets.show()
+-------------+----------+----------+
| edge| src| dst|
+-------------+----------+----------+
|[1,2,Follows]|[1,Thomas]| [2,Krish]|
| [1,2,Son]|[1,Thomas]| [2,Krish]|
|[2,3,Follows]| [2,Krish]|[3,Mathew]|
+-------------+----------+----------+
scala> // Convert the GraphFrame based graph to a Spark GraphX based graph
scala> val userGraphXFromGraphFrame: Graph[Row, Row] =
userGraphFrameFromGraphX.toGraphX
userGraphXFromGraphFrame:
org.apache.spark.graphx.Graph[org.apache.spark.sql.Row,org.apache.spark.sql
.Row] = [email protected]
When creating DataFrames for the GraphFrame, the only thing to keep in mind is that there are some mandatory columns for the vertices and the edges. In the DataFrame for vertices, the id column is mandatory. In the DataFrame for edges, the src and dst columns are mandatory. Apart from that, any number of arbitrary columns can be stored with both the vertices and the edges of a GraphFrame. In the Spark GraphX library, the vertex identifier must be a long integer, but the GraphFrame doesn’t have any such limitations and any type is supported as the vertex identifier. Readers should already be familiar with DataFrames; any operation that can be done on a DataFrame can be done on the vertices and edges of a GraphFrame.
All the graph processing algorithms supported by Spark GraphX are supported by GraphFrames as well.
The Python version of GraphFrames has fewer features. Since Python is not a supported programming language for the Spark GraphX library, GraphFrame to GraphX and GraphX to GraphFrame conversions are not supported in Python. Since readers are familiar with the creation of DataFrames in Spark using Python, the Python example is omitted here. Moreover, there are some pending defects in the GraphFrames API for Python and not all the features demonstrated previously using Scala function properly in Python at the time of writing
Understanding GraphFrames queries
The Spark GraphX library is the RDD-based graph processing library, but GraphFrames is a Spark DataFrame-based graph processing library that is available as an external package. Spark GraphX supports many graph processing algorithms, but GraphFrames supports not only graph processing algorithms, but also graph queries. The major difference between graph processing algorithms and graph queries is that graph processing algorithms are used to process the data hidden in a graph data structure, while graph queries are used to search for patterns in the data hidden in a graph data structure. In GraphFrame parlance, graph queries are also known as motif finding. This has tremendous applications in genetics and other biological sciences that deal with sequence motifs.
From a use case perspective, take the use case of users following each other in a social media application. Users have relationships between them. In the previous sections, these relationships were modeled as graphs. In real-world use cases, such graphs can become really huge, and if there is a need to find users with relationships between them in both directions, it can be expressed as a pattern in graph query, and such relationships can be found using easy programmatic constructs. The following demonstration models the relationship between the users in a GraphFrame, and a pattern search is done using that.
At the Scala REPL prompt of Spark 1.6, try the following statements:
$ cd $SPARK_1.6_HOME
$ ./bin/spark-shell --packages graphframes:graphframes:0.1.0-spark1.6
Ivy Default Cache set to: /Users/RajT/.ivy2/cache
The jars for the packages stored in: /Users/RajT/.ivy2/jars
:: loading settings :: url = jar:file:/Users/RajT/source-code/sparksource/spark-1.6.1/assembly/target/scala-2.10/spark-assembly-1.6.2-
SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found graphframes#graphframes;0.1.0-spark1.6 in list
:: resolution report :: resolve 145ms :: artifacts dl 2ms
:: modules in use:
graphframes#graphframes;0.1.0-spark1.6 from list in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/5ms)
16/07/29 07:09:08 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala> import org.graphframes._
import org.graphframes._
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
scala> //Create a DataFrame of users containing tuple values with a
mandatory String field as id and another String type as the property of the
vertex. Here it can be seen that the vertex identifier is no longer a long
integer.
scala> val users = sqlContext.createDataFrame(List(("1", "Thomas"),("2",
"Krish"),("3", "Mathew"))).toDF("id", "name")
users: org.apache.spark.sql.DataFrame = [id: string, name: string]
scala> //Create a DataFrame for Edge with String type as the property of
the edge
scala> val userRelationships = sqlContext.createDataFrame(List(("1", "2",
"Follows"),("2", "1", "Follows"),("2", "3", "Follows"))).toDF("src", "dst",
"relationship")
userRelationships: org.apache.spark.sql.DataFrame = [src: string, dst:
string, relationship: string]
scala> //Create the GraphFrame
scala> val userGraph = GraphFrame(users, userRelationships)
userGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name:
string], e:[src: string, dst: string, relationship: string])
scala> // Search for pairs of users who are following each other
scala> // In other words the query can be read like this. Find the list of
users having a pattern such that user u1 is related to user u2 using the
edge e1 and user u2 is related to the user u1 using the edge e2. When a
query is formed like this, the result will list with columns u1, u2, e1 and
e2. When modelling real-world use cases, more meaningful variables can be
used suitable for the use case.
scala> val graphQuery = userGraph.find("(u1)-[e1]->(u2); (u2)-[e2]->(u1)")
graphQuery: org.apache.spark.sql.DataFrame = [e1:
struct<src:string,dst:string,relationship:string>, u1:
struct<id:string,name:string>, u2: struct<id:string,name:string>, e2:
struct<src:string,dst:string,relationship:string>]
scala> graphQuery.show()
+-------------+----------+----------+-------------+
| e1| u1| u2| e2|
+-------------+----------+----------+-------------+
|[1,2,Follows]|[1,Thomas]| [2,Krish]|[2,1,Follows]|
|[2,1,Follows]| [2,Krish]|[1,Thomas]|[1,2,Follows]|
+-------------+----------+----------+-------------+
Note that the columns in the graph query result are formed with the elements given in the search pattern. There is no limit to the way the patterns can be formed.
Note the data type of the graph query result. It is a DataFrame object. That brings a great flexibility in processing the query results using the familiar Spark SQL library.
The biggest limitation of the Spark GraphX library is that its API is not supported with popular programming languages such as Python and R. Since GraphFrames is a DataFrame based library, once it matures, it will enable graph processing in all the programming languages supported by DataFrames. Spark external package is definitely a potential candidate to be included as part of the Spark.
To know more on the design and development of a data processing application using Spark and the family of libraries built on top of it, do check out this book Apache Spark 2 for Beginners.