Apache Spark

 

Apache Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. It was originally developed in 2009 in UC Berkeley’s AMPLab, and open sourced in 2010 as an Apache project.

Spark has several advantages compared to other big data and MapReduce technologies like Hadoop and Storm. OrientDB provides a connector to Apache Spark to leverage Spark’s capabilities while using OrientDB as the datastore.

Installation

First, add the following to POM.xml.

<repository>
<id>bintray</id>
<name>bintray</name>
<url>https://dl.bintray.com/sbcd90/org.apache.spark/</url>
</repository>

Then add the following as a maven dependency:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-orientdb-{spark.version}_2.10</artifactId>
<version>1.3</version>
</dependency>

 

Data Source

Now, we create an OrientDB class, ‘Person’, and then create an OrientDB document that belongs to the class from the Spark DataSource.

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER).option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.mode(SaveMode.Overwrite)
.save()

 

Now, we’ll read all the documents from ‘Person’ now.

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.load()

 

Custom queries as filters

We can also write OrientDB SQL to filter the documents fetched:

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.documents”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“class”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()

 

These APIs now return a Spark DataFrame — on top of which any kind of Spark DataFrame operation can be performed.

Create vertices from Spark

 

Now, let us see how the Spark Datasource can create OrientDB Graphs. Let’s create the OrientDB Graph vertices first, which belongs to the vertex type ‘Person’.

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
StructType(Seq(StructField(“id”, IntegerType)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.mode(SaveMode.Overwrite)
.save()

 

Create edges from Spark

 

Now let’s create the edges that belong to the edge type ‘IsRelatedTo’.

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, “friends”),
Row(2, 3, “enemy”),
Row(3, 4, “friends”),
Row(4, 1, “enemy”)
)),
StructType(Seq(
StructField(“src”, IntegerType),
StructField(“dst”, IntegerType),
StructField(“relationship”, StringType)
)))
.write
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“edgetype”, “IsRelatedTo”)
.mode(SaveMode.Overwrite)
.save()

 

Using Spark Data Frames

 

We can individually load the OrientDB vertices and edges into Spark DataFrame:

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.load()
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.load()

 

Using Spark Graph Frames

 

And here, we can write OrientDB graph SQL queries to suit our needs. Now that we have the OrientDB vertices and edges DataFrames, let’s create a Spark GraphFrame out of this.

 

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val loadedVerticesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“vertextype”, “Person”)
.option(“query”, s”select * from Person where id = 1″)
.load()
val loadedEdgesDf = sqlContext.read
.format(“org.apache.spark.orientdb.graphs”)
.option(“dburl”, ORIENTDB_CONNECTION_URL)
.option(“user”, ORIENTDB_USER)
.option(“password”, ORIENTDB_PASSWORD)
.option(“edgetype”, “IsRelatedTo”)
.option(“query”, s”select * from IsRelatedTo where relationship = ‘friends'”)
.load()
val g = GraphFrame(loadedVerticesDf, loadedEdgesDf)

 

This allows us to leverage the features of Spark GraphFrame on top of OrientDB graphs.

 

Spark is a trademark registered by Apache Software Foundation.