8 July 2016
8 July 2016,
 Off

London, July 8, 2016
By Andrea Iacono

The Spark connector for OrientDB has been provided by Metreta and hosted on github at https://github.com/metreta/spark-orientdb-connector, letting Spark and OrientDB interoperate in two ways: accessing OrientDB data from Spark and writing Spark data to OrientDB. The connector is also aware of the difference between an OrientDB document database and an OrientDB graph database:

  • Using OrientDB as a documents database: The OrientDB documents can be read from/written to Spark RDDs.
  • Using OrientDB as a graph database: The vertices and edges can be read from/written to Spark GraphX graphs.

To compile the connector, clone the master branch and update its build.sbt file with the Scala version and the Spark version you’re using. You may subsequently launch the package command on sbt:

sbt package

 

Upon performing these steps, you should find a jar file containing the compiled connector in your target directory. Be sure to have created the test database as well (as shown in the connector’s page).

The first step for creating our sample project is to create a build.sbt, where we have to define the library dependencies:

libraryDependencies ++= Seq(
 "com.orientechnologies" % "orientdb-core" % "2.2.3",
 "com.orientechnologies" % "orientdb-client" % "2.2.3",
 "com.orientechnologies" % "orientdb-graphdb" % "2.2.3",
 "com.orientechnologies" % "orientdb-distributed" % "2.2.3",
 "org.apache.spark" % "spark-core_2.11" % "1.6.1",
 "org.apache.spark" % "spark-graphx_2.11" % "1.6.1",
 "org.scala-lang" % "scala-compiler" % "2.11.4",
 "org.scala-lang" % "scala-library" % "2.11.4",
 "org.scala-lang" % "scala-reflect" % "2.11.4",
 "jline" % "jline" % "2.12",
 "com.tinkerpop.blueprints" % "blueprints-core" % "2.6.0",
 "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.4",
 "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.7.4"
)

 

We must then configure Spark to attach to OrientDB, which we can do by defining the SparkConf in the following way:

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("ConnectorSample")
    .set("spark.orientdb.clustermode", "remote")
    .set("spark.orientdb.connection.nodes", "127.0.0.1")
    .set("spark.orientdb.protocol", "remote")
    .set("spark.orientdb.dbname", "test")
    .set("spark.orientdb.port", "2424")
    .set("spark.orientdb.user", "admin")
    .set("spark.orientdb.password", "admin")

 

We can now share data between Spark and OrientDB.

Orient Documents to/from Spark RDDs
Let’s start reading some OrientDB documents as a Spark RDD:

var peopleRdd: RDD[OrientDocument] = sc.orientQuery("Person")

 

With the orientQuery() method, we can read the documents of a class from OrientDB and may have them as a Spark RDD, on which we can do the usual manipulations. We can then save them back to OrientDB:

peopleRdd
 .filter(person => person.getString("name") == "John")
 .map(person => new Person("Foo", "Bar"))
 .saveToOrient("Person")

 

Like in this example where, after a bit of manipulation, we use the saveToOrient() method to save all the elements of the RDD as OrientDB documents, we can check both querying OrientDB via Studio or querying from the code:

sc.orientQuery("Person").foreach(p => println(s"Person: ${p.getString("surname")}, ${p.getString("name")}"))

 

We can also update the OrientDB documents using the upsertToOrient() method, as shown in this example where we update a document’s property via the RDD and save them back to OrientDB:

peopleRdd
 .filter(person => !person.getString("surname").startsWith("New"))
 .map(person => new Person(person.getString("name"), "New " + person.getString("surname")))
 .upsertToOrient("Person")

 

Orient Graphs to/from Spark GraphX
When we deal with graphs, RDDs are not enough and so we must move to Spark’s API for graph computing: GraphX.

To access OrientDB vertices and edges, we must use the orientGraph() method as shown in this example:

val peopleGraph: Graph[OrientDocument, OrientDocument] = sc.orientGraph()

 

Since peopleGraph is a org.apache.spark.graphx.Graph object, we can use its methods to access OrientDB data, as in these examples:

val people: VertexRDD[OrientDocument] = peopleGraph.vertices
val relationships: EdgeRDD[OrientDocument] = peopleGraph.edges

println(s"The graph contains ${people.count()} vertices and ${relationships.count()} edges.\n")

 

We can also access triplets, as in this example where we print friendships among people:

peopleGraph
 .triplets
 .foreach(triplet => {
   val srcPerson: OrientDocument = triplet.srcAttr
   val dstPerson: OrientDocument = triplet.dstAttr
   println(s"Person: ${srcPerson.getString("surname")}, ${srcPerson.getString("name")} [${triplet.srcId}]. Friend: ${dstPerson.getString("surname")}, ${dstPerson.getString("name")} [${triplet.dstId}]")
 })

 

The built-in graph algorithms supplied by GraphX are also available, like the triangleCount() used here to show the triangles among people:

val triangles = peopleGraph.triangleCount()

// prints how many triangles every vertex participate in
triangles
 .vertices
 .foreach {
   case (vertexId, trianglesNumber) => println(s"Person [${vertexId}] participates in ${trianglesNumber} triangles.")
 }

 

When we have a GraphX graph and we want to save it as an OrientDB graph, we can use the saveGraphToOrient():

val gr: Graph[Person, String] = createSampleGraph(sc)
gr.saveGraphToOrient()

 

In this example, the createSampleGraph() method just creates a simple graph with three vertices and five edges as RDDs and then builds the graph upon them:

def createSampleGraph(sparkContext: SparkContext): Graph[Person, String] = {

 val people: RDD[(VertexId, Person)] =
   sparkContext.parallelize(
     Array(
       (1L, new Person("Alice", "Anderson")),
       (2L, new Person("Bob", "Brown")),
       (3L, new Person("Carol", "Clark"))
     )
   )


 val edges: RDD[Edge[String]] =
   sparkContext.parallelize(
     Array(
       Edge(1L, 2L, "Friendship"),
       Edge(1L, 3L, "Friendship"),
       Edge(2L, 1L, "Friendship"),
       Edge(3L, 1L, "Friendship"),
       Edge(3L, 2L, "Friendship")
     )
   )
 Graph(people, edges)
}

 

This full code of these examples is available on github at https://github.com/andreaiacono/SparkOrientDbConnectorDemo.

Comments are closed.