What is a Graph?

A Graph is a representation of the directed and undirected graph concept that is taken from mathematics graph theory. It is the abstract data type and a data structure that contains a measurable set of vertices with some unordered pair of vertices for an undirected graph and an ordered set of vertices for a directed set and together these sets are called edge.


graph pic


What is Apache Spark GraphX?

Apache Spark GraphX is a distributed graph processing framework that is used to process graphs in parallel. It provides a collection of Graph algorithms and builders which are used to analyze the graph tasks easily. GraphX uses the Spark RDD to provides a new Graph abstraction. There is a property graph that has user-defined objects for each property graph.


Apache Spark GraphX Features

Apache Spark GraphX provides the following features.

    1. Flexibility

    Apache Spark GraphX is capable of working with graphs and perform the computations on them. Spark GraphX is be used for ETL processing, iterative graph computation, exploratory analysis, and so on. The data can be views as a collection as well as a graph and the transformation and joining of that data can be efficiently performed with Spark RDD.

    2. Speed

    Apache Spark GraphX provides better performance compared to the fastest graph systems and since it works with Spark so it by default adopts the features of Apache Spark such as fault tolerance, flexibility, ease to use, and so on.

    3. Algorithms

    Apache Spark GraphX provides the following graph algorithms.

    • PageRank
    • Connected Components
    • Label Propagation
    • SVD++
    • Strongly Connected Components
    • Triangle Count

Background on Graph-Parallel Computation

The graph data keeps growing day after day and looking into its importance the new graph processing systems are getting developed. These systems are working on the new technology to perform partition and process graphs distributedly. The Graphics algorithms are getting tested with new systems to provides a faster response compared to the common data processing systems.


graph background


Apache Spark GraphX Project Goal

The existing Graph processing system work on the complicated programming model that leads to heavy data movement and duplications. So to overcome, this issue Apache Spark GraphX project is an ally to provide the data-parallel and graph-parallel processing to a single system with one composable API. Apache Spark GraphX API provides the facility to see data as a collection as well as graphs without any duplication or any data movement.


graph background1


Graph Algorithms

Apache GraphX provides a set of graph algorithms to simplify analytics tasks. These algorithms are contained in the org.apache.spark.graphx.lib package and can be accessed directly as methods on Graph via GraphOps.

1. PageRank Algorithm

The Graphs PageRank Algorithm estimate the usage of every vertex of the Graph and assume that the representation of the edge from u to v is the description of v's. The best example of PageRank is Twitter users in that if a Twitter user is followed by many others, the user will be ranked highly.

Let us understand the PageRank Algorithm with the help of the Social networking program.

PageRank Program Execution

We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.

Datasets & Program Details

Datasets for this program are present under SPARK_HOME directory.

graphx/data/followers.txt

graphx/data/users.txt

Location of PageRankExample.scala program is mentioned below.

/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala

Run PageRankExample.scala program from SPARK_HOME.

./bin/run-example org.apache.spark.examples.graphx.PageRankExample

Let us understand code line by line and then we will execute it on terminal.

  1. Import Spark's graphx and SQL package.
  2. Creates a SparkSession.
  3. Load the edges as a graph.
  4. Run PageRank.
  5. Join the ranks with the usernames.
  6. Print the result.
import org.apache.spark.graphx.GraphLoader import org.apache.spark.sql.SparkSession object PageRankExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName(s"${this.getClass.getSimpleName}") .getOrCreate() val sc = spark.sparkContext val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") val ranks = graph.pageRank(0.0001).vertices val users = sc.textFile("data/graphx/users.txt").map { line => val fields = line.split(",") (fields(0).toLong, fields(1)) } val ranksByUsername = users.join(ranks).map { case (id, (username, rank)) => (username, rank) } println(ranksByUsername.collect().mkString("\n")) spark.stop() } }

graphx program1

graphx program


2. Connected Components Algorithm

In the connected components algorithm, there is a subgraph in which each vertex is accessible to each other vertex by following edges. In this algorithm, every connected component of the graph is tagged with an ID from its lower vertex.

Connected Components Program Execution

We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.

Datasets & Program Details

Datasets for this program are present under SPARK_HOME directory.

graphx/data/followers.txt

graphx/data/users.txt

Location of ConnectedComponentsExample.scala program is mentioned below.

/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala

Run ConnectedComponentsExample.scala program from SPARK_HOME.

./bin/run-example org.apache.spark.examples.graphx.ConnectedComponentsExample

Let us understand code line by line and then we will execute it on terminal.

  1. Import Spark's graphx and SQL package.
  2. Creates a SparkSession.
  3. Load the graph as in the PageRank example.
  4. Find the connected components.
  5. Join the connected components with the usernames.
  6. Print the result.
import org.apache.spark.graphx.GraphLoader import org.apache.spark.sql.SparkSession object ConnectedComponentsExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName(s"${this.getClass.getSimpleName}") .getOrCreate() val sc = spark.sparkContext val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") val cc = graph.connectedComponents().vertices val users = sc.textFile("data/graphx/users.txt").map { line => val fields = line.split(",") (fields(0).toLong, fields(1)) } val ccByUsername = users.join(cc).map { case (id, (username, cc)) => (username, cc) } println(ccByUsername.collect().mkString("\n")) spark.stop() } }


connected com program

connected com program1


3. Triangle Counting Algorithm

The Triangle Counting Algorithm provides the two nearby vertices and an edge in between them that present vertex.

Triangle Counting Program Execution

We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.

Datasets & Program Details

Datasets for this program are present under SPARK_HOME directory.

graphx/data/followers.txt

graphx/data/users.txt

Location of TriangleCountingExample.scala program is mentioned below.

/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala

Run TriangleCountingExample.scala program from SPARK_HOME.

./bin/run-example org.apache.spark.examples.graphx.TriangleCountingExample

Let us understand code line by line and then we will execute it on terminal.

  1. Import Spark's graphx and SQL package.
  2. Creates a SparkSession.
  3. Arrange the edge in the order of canonical and perform the graph partition to count the triangle.
  4. Find the triangle count for each vertex.
  5. Join the triangle counts with the usernames.
  6. Print the result.
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy} import org.apache.spark.sql.SparkSession object TriangleCountingExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName(s"${this.getClass.getSimpleName}") .getOrCreate() val sc = spark.sparkContext val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true) .partitionBy(PartitionStrategy.RandomVertexCut) val triCounts = graph.triangleCount().vertices val users = sc.textFile("data/graphx/users.txt").map { line => val fields = line.split(",") (fields(0).toLong, fields(1)) } val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) => (username, tc) } println(triCountByUsername.collect().mkString("\n")) spark.stop() } }

triangle program

triangle program1