12 min read

[box type=”note” align=”” class=”” width=””]This article is an excerpt from a book by Rajanarayanan Thottuvaikkatumana titled, Apache Spark 2 for Beginners. The author presents a learners guide for python and scala developers to develop large-scale and distributed data processing applications in the business environment.[/box]

In this post we will see how a Spark user can work with Spark’s most popular graph processing package, GraphFrames. Additionally explore how you can benefit from running queries and finding insightful patterns through graphs.

The Spark GraphX library is the graph processing library that has the least programming language support. Scala is the only programming language supported by the Spark GraphX library. GraphFrames is a new graph processing library available as an external Spark package developed by Databricks, University of California, Berkeley, and Massachusetts Institute of Technology, built on top of Spark DataFrames. Since it is built on top of DataFrames, all the operations that can be done on DataFrames are potentially possible on GraphFrames, with support for programming languages such as Scala, Java, Python, and R with a uniform API. Since GraphFrames is built on top of DataFrames, the persistence of data, support for numerous data sources, and powerful graph queries in Spark SQL are additional benefits users get for free.

Just like the Spark GraphX library, in GraphFrames the data is stored in vertices and edges. The vertices and edges use DataFrames as the data structure. The first use case covered in the beginning of this chapter is used again to elucidate GraphFrames-based graph processing.

Please make a note that GraphFrames is an external Spark package. It has some incompatibility with Spark 2.0. Because of that, the following code snippets will not work with  park 2.0. They work with Spark 1.6. Refer to their website to check Spark 2.0 support.

At the Scala REPL prompt of Spark 1.6, try the following statements. Since GraphFrames is an external Spark package, while bringing up the appropriate REPL, the library has to be imported and the following command is used in the terminal prompt to fire up the REPL and make sure that the library is loaded without any error messages:

$ cd $SPARK_1.6__HOME

$ ./bin/spark-shell --packages graphframes:graphframes:0.1.0-spark1.6

Ivy Default Cache set to: /Users/RajT/.ivy2/cache

The jars for the packages stored in: /Users/RajT/.ivy2/jars

:: loading settings :: url = jar:file:/Users/RajT/source-code/sparksource/spark-1.6.1/assembly/target/scala-2.10/spark-assembly-1.6.2-

SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml

graphframes#graphframes added as a dependency

:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

found graphframes#graphframes;0.1.0-spark1.6 in list

:: resolution report :: resolve 153ms :: artifacts dl 2ms

:: modules in use:

graphframes#graphframes;0.1.0-spark1.6 from list in [default]

---------------------------------------------------------------------

| | modules || artifacts |

| conf | number| search|dwnlded|evicted|| number|dwnlded|

---------------------------------------------------------------------

| default | 1 | 0 | 0 | 0 || 1 | 0 |

---------------------------------------------------------------------

:: retrieving :: org.apache.spark#spark-submit-parent

confs: [default]

0 artifacts copied, 1 already retrieved (0kB/5ms)

16/07/31 09:22:11 WARN NativeCodeLoader: Unable to load native-hadoop

library for your platform... using builtin-java classes where applicable

Welcome to

____ __

/ __/__ ___ _____/ /__

_ / _ / _ `/ __/ '_/

/___/ .__/_,_/_/ /_/_ version 1.6.1

/_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java

1.8.0_66)

Type in expressions to have them evaluated.

Type :help for more information.

Spark context available as sc.

SQL context available as sqlContext.

scala> import org.graphframes._

import org.graphframes._

scala> import org.apache.spark.rdd.RDD

import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.Row

import org.apache.spark.sql.Row

scala> import org.apache.spark.graphx._

import org.apache.spark.graphx._

scala> //Create a DataFrame of users containing tuple values with a

mandatory Long and another String type as the property of the vertex

scala> val users = sqlContext.createDataFrame(List((1L, "Thomas"),(2L,

"Krish"),(3L, "Mathew"))).toDF("id", "name")

users: org.apache.spark.sql.DataFrame = [id: bigint, name: string]

scala> //Created a DataFrame for Edge with String type as the property of

the edge

scala> val userRelationships = sqlContext.createDataFrame(List((1L, 2L,

"Follows"),(1L, 2L, "Son"),(2L, 3L, "Follows"))).toDF("src", "dst",

"relationship")

userRelationships: org.apache.spark.sql.DataFrame = [src: bigint, dst:

bigint, relationship: string]

scala> val userGraph = GraphFrame(users, userRelationships)

userGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, name:

string], e:[src: bigint, dst: bigint, relationship: string])

scala> // Vertices in the graph

scala> userGraph.vertices.show()

+---+------+

| id| name|

+---+------+

| 1|Thomas|

| 2| Krish|

| 3|Mathew|

+---+------+

scala> // Edges in the graph

scala> userGraph.edges.show()

+---+---+------------+

|src|dst|relationship|

+---+---+------------+

| 1| 2| Follows|

| 1| 2| Son|

| 2| 3| Follows|

+---+---+------------+

scala> //Number of edges in the graph

scala> val edgeCount = userGraph.edges.count()

edgeCount: Long = 3

scala> //Number of vertices in the graph

scala> val vertexCount = userGraph.vertices.count()

vertexCount: Long = 3

scala> //Number of edges coming to each of the vertex.

scala> userGraph.inDegrees.show()

+---+--------+

| id|inDegree|

+---+--------+

| 2| 2|

| 3| 1|

+---+--------+

scala> //Number of edges going out of each of the vertex.

scala> userGraph.outDegrees.show()

+---+---------+

| id|outDegree|

+---+---------+

| 1| 2|

| 2| 1|

+---+---------+

scala> //Total number of edges coming in and going out of each vertex.

scala> userGraph.degrees.show()

+---+------+

| id|degree|

+---+------+

| 1| 2|

| 2| 3|

| 3| 1|

+---+------+

scala> //Get the triplets of the graph

scala> userGraph.triplets.show()

+-------------+----------+----------+

| edge| src| dst|

+-------------+----------+----------+

|[1,2,Follows]|[1,Thomas]| [2,Krish]|

| [1,2,Son]|[1,Thomas]| [2,Krish]|

|[2,3,Follows]| [2,Krish]|[3,Mathew]|

+-------------+----------+----------+

scala> //Using the DataFrame API, apply filter and select only the needed

edges

scala> val numFollows = userGraph.edges.filter("relationship =

'Follows'").count()

numFollows: Long = 2

scala> //Create an RDD of users containing tuple values with a mandatory

Long and another String type as the property of the vertex

scala> val usersRDD: RDD[(Long, String)] = sc.parallelize(Array((1L,

"Thomas"), (2L, "Krish"),(3L, "Mathew")))

usersRDD: org.apache.spark.rdd.RDD[(Long, String)] =

ParallelCollectionRDD[54] at parallelize at <console>:35

scala> //Created an RDD of Edge type with String type as the property of

the edge

scala> val userRelationshipsRDD: RDD[Edge[String]] =

sc.parallelize(Array(Edge(1L, 2L, "Follows"), Edge(1L, 2L,

"Son"),Edge(2L, 3L, "Follows")))

userRelationshipsRDD:

org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] =

ParallelCollectionRDD[55] at parallelize at <console>:35

scala> //Create a graph containing the vertex and edge RDDs as created

before

scala> val userGraphXFromRDD = Graph(usersRDD, userRelationshipsRDD)

userGraphXFromRDD: org.apache.spark.graphx.Graph[String,String] =

org.apache.spark.graphx.impl.GraphImpl@77a3c614

scala> //Create the GraphFrame based graph from Spark GraphX based graph

scala> val userGraphFrameFromGraphX: GraphFrame =

GraphFrame.fromGraphX(userGraphXFromRDD)

userGraphFrameFromGraphX: org.graphframes.GraphFrame = GraphFrame(v:[id:

bigint, attr: string], e:[src: bigint, dst: bigint, attr: string])

scala> userGraphFrameFromGraphX.triplets.show()

+-------------+----------+----------+

| edge| src| dst|

+-------------+----------+----------+

|[1,2,Follows]|[1,Thomas]| [2,Krish]|

| [1,2,Son]|[1,Thomas]| [2,Krish]|

|[2,3,Follows]| [2,Krish]|[3,Mathew]|

+-------------+----------+----------+

scala> // Convert the GraphFrame based graph to a Spark GraphX based graph

scala> val userGraphXFromGraphFrame: Graph[Row, Row] =

userGraphFrameFromGraphX.toGraphX

userGraphXFromGraphFrame:

org.apache.spark.graphx.Graph[org.apache.spark.sql.Row,org.apache.spark.sql

.Row] = org.apache.spark.graphx.impl.GraphImpl@238d6aa2

When creating DataFrames for the GraphFrame, the only thing to keep in mind is that there are some mandatory columns for the vertices and the edges. In the DataFrame for vertices, the id column is mandatory. In the DataFrame for edges, the src and dst columns are mandatory. Apart from that, any number of arbitrary columns can be stored with both the vertices and the edges of a GraphFrame. In the Spark GraphX library, the vertex identifier must be a long integer, but the GraphFrame doesn’t have any such limitations and any type is supported as the vertex identifier. Readers should already be familiar with DataFrames; any operation that can be done on a DataFrame can be done on the vertices and edges of a GraphFrame.

All the graph processing algorithms supported by Spark GraphX are supported by GraphFrames as well.

The Python version of GraphFrames has fewer features. Since Python is not a supported programming language for the Spark GraphX library, GraphFrame to GraphX and GraphX to GraphFrame conversions are not supported in Python. Since readers are familiar with the creation of DataFrames in Spark using Python, the Python example is omitted here. Moreover, there are some pending defects in the GraphFrames API for Python and not all the features demonstrated previously using Scala function properly in Python at the time of writing  

Understanding GraphFrames queries

The Spark GraphX library is the RDD-based graph processing library, but GraphFrames is a Spark DataFrame-based graph processing library that is available as an external package. Spark GraphX supports many graph processing algorithms, but GraphFrames supports not only graph processing algorithms, but also graph queries. The major difference between graph processing algorithms and graph queries is that graph processing algorithms are used to process the data hidden in a graph data structure, while graph queries are used to search for patterns in the data hidden in a graph data structure. In GraphFrame parlance, graph queries are also known as motif finding. This has tremendous applications in genetics and other biological sciences that deal with sequence motifs.

From a use case perspective, take the use case of users following each other in a social media application. Users have relationships between them. In the previous sections, these relationships were modeled as graphs. In real-world use cases, such graphs can become really huge, and if there is a need to find users with relationships between them in both directions, it can be expressed as a pattern in graph query, and such relationships can be found using easy programmatic constructs. The following demonstration models the relationship between the users in a GraphFrame, and a pattern search is done using that.

At the Scala REPL prompt of Spark 1.6, try the following statements:

$ cd $SPARK_1.6_HOME

$ ./bin/spark-shell --packages graphframes:graphframes:0.1.0-spark1.6

Ivy Default Cache set to: /Users/RajT/.ivy2/cache

The jars for the packages stored in: /Users/RajT/.ivy2/jars

:: loading settings :: url = jar:file:/Users/RajT/source-code/sparksource/spark-1.6.1/assembly/target/scala-2.10/spark-assembly-1.6.2-

SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml

graphframes#graphframes added as a dependency

:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

found graphframes#graphframes;0.1.0-spark1.6 in list

:: resolution report :: resolve 145ms :: artifacts dl 2ms

:: modules in use:

graphframes#graphframes;0.1.0-spark1.6 from list in [default]

---------------------------------------------------------------------

| | modules || artifacts |

| conf | number| search|dwnlded|evicted|| number|dwnlded|

---------------------------------------------------------------------

| default | 1 | 0 | 0 | 0 || 1 | 0 |

---------------------------------------------------------------------

:: retrieving :: org.apache.spark#spark-submit-parent

confs: [default]

0 artifacts copied, 1 already retrieved (0kB/5ms)

16/07/29 07:09:08 WARN NativeCodeLoader: Unable to load native-hadoop

library for your platform... using builtin-java classes where applicable

Welcome to

____ __

/ __/__ ___ _____/ /__

_ / _ / _ `/ __/ '_/

/___/ .__/_,_/_/ /_/_ version 1.6.1

/_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java

1.8.0_66)

Type in expressions to have them evaluated.

Type :help for more information.

Spark context available as sc.

SQL context available as sqlContext.

scala> import org.graphframes._

import org.graphframes._

scala> import org.apache.spark.rdd.RDD

import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.Row

import org.apache.spark.sql.Row

scala> import org.apache.spark.graphx._

import org.apache.spark.graphx._

scala> //Create a DataFrame of users containing tuple values with a

mandatory String field as id and another String type as the property of the

vertex. Here it can be seen that the vertex identifier is no longer a long

integer.

scala> val users = sqlContext.createDataFrame(List(("1", "Thomas"),("2",

"Krish"),("3", "Mathew"))).toDF("id", "name")

users: org.apache.spark.sql.DataFrame = [id: string, name: string]

scala> //Create a DataFrame for Edge with String type as the property of

the edge

scala> val userRelationships = sqlContext.createDataFrame(List(("1", "2",

"Follows"),("2", "1", "Follows"),("2", "3", "Follows"))).toDF("src", "dst",

"relationship")

userRelationships: org.apache.spark.sql.DataFrame = [src: string, dst:

string, relationship: string]

scala> //Create the GraphFrame

scala> val userGraph = GraphFrame(users, userRelationships)

userGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, name:

string], e:[src: string, dst: string, relationship: string])

scala> // Search for pairs of users who are following each other

scala> // In other words the query can be read like this. Find the list of

users having a pattern such that user u1 is related to user u2 using the

edge e1 and user u2 is related to the user u1 using the edge e2. When a

query is formed like this, the result will list with columns u1, u2, e1 and

e2. When modelling real-world use cases, more meaningful variables can be

used suitable for the use case.

scala> val graphQuery = userGraph.find("(u1)-[e1]->(u2); (u2)-[e2]->(u1)")

graphQuery: org.apache.spark.sql.DataFrame = [e1:

struct<src:string,dst:string,relationship:string>, u1:

struct<id:string,name:string>, u2: struct<id:string,name:string>, e2:

struct<src:string,dst:string,relationship:string>]

scala> graphQuery.show()

+-------------+----------+----------+-------------+

| e1| u1| u2| e2|

+-------------+----------+----------+-------------+

|[1,2,Follows]|[1,Thomas]| [2,Krish]|[2,1,Follows]|

|[2,1,Follows]| [2,Krish]|[1,Thomas]|[1,2,Follows]|

+-------------+----------+----------+-------------+

Note that the columns in the graph query result are formed with the elements given in the search pattern. There is no limit to the way the patterns can be formed.

Note the data type of the graph query result. It is a DataFrame object. That brings a great flexibility in processing the query results using the familiar Spark SQL library.

The biggest limitation of the Spark GraphX library is that its API is not supported with popular programming languages such as Python and R. Since GraphFrames is a DataFrame based library, once it matures, it will enable graph processing in all the programming languages supported by DataFrames. Spark external package is definitely a potential candidate to be included as part of the Spark.

To know more on the design and development of a data processing application using Spark and the family of libraries built on top of it, do check out this book Apache Spark 2 for Beginners.

Apache Spark 2 for Beginners

 

Category Manager and tech enthusiast. Previously worked on global market research and lead generation assignments. Keeps a constant eye on Artificial Intelligence.

LEAVE A REPLY

Please enter your comment!
Please enter your name here