Using the Spark Connector for OrientDB

This post is outdated, please refer to the Spark page.

 

 


London, July 8, 2016
By Andrea Iacono

The Spark connector for OrientDB has been provided by Metreta and hosted on github at https://github.com/metreta/spark-orientdb-connector, letting Spark and OrientDB interoperate in two ways: accessing OrientDB data from Spark and writing Spark data to OrientDB. The connector is also aware of the difference between an OrientDB document database and an OrientDB graph database:

To compile the connector, clone the master branch and update its build.sbt file with the Scala version and the Spark version you’re using. You may subsequently launch the package command on sbt:

sbt package

 

Upon performing these steps, you should find a jar file containing the compiled connector in your target directory. Be sure to have created the test database as well (as shown in the connector’s page).

The first step for creating our sample project is to create a build.sbt, where we have to define the library dependencies:

libraryDependencies ++= Seq(
 "com.orientechnologies" % "orientdb-core" % "2.2.3",
 "com.orientechnologies" % "orientdb-client" % "2.2.3",
 "com.orientechnologies" % "orientdb-graphdb" % "2.2.3",
 "com.orientechnologies" % "orientdb-distributed" % "2.2.3",
 "org.apache.spark" % "spark-core_2.11" % "1.6.1",
 "org.apache.spark" % "spark-graphx_2.11" % "1.6.1",
 "org.scala-lang" % "scala-compiler" % "2.11.4",
 "org.scala-lang" % "scala-library" % "2.11.4",
 "org.scala-lang" % "scala-reflect" % "2.11.4",
 "jline" % "jline" % "2.12",
 "com.tinkerpop.blueprints" % "blueprints-core" % "2.6.0",
 "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.4",
 "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.7.4"
)

 

We must then configure Spark to attach to OrientDB, which we can do by defining the SparkConf in the following way:

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("ConnectorSample")
    .set("spark.orientdb.clustermode", "remote")
    .set("spark.orientdb.connection.nodes", "127.0.0.1")
    .set("spark.orientdb.protocol", "remote")
    .set("spark.orientdb.dbname", "test")
    .set("spark.orientdb.port", "2424")
    .set("spark.orientdb.user", "admin")
    .set("spark.orientdb.password", "admin")

 

We can now share data between Spark and OrientDB.

Orient Documents to/from Spark RDDs
Let’s start reading some OrientDB documents as a Spark RDD:

var peopleRdd: RDD[OrientDocument] = sc.orientQuery("Person")

 

With the orientQuery() method, we can read the documents of a class from OrientDB and may have them as a Spark RDD, on which we can do the usual manipulations. We can then save them back to OrientDB:

peopleRdd
 .filter(person => person.getString("name") == "John")
 .map(person => new Person("Foo", "Bar"))
 .saveToOrient("Person")

 

Like in this example where, after a bit of manipulation, we use the saveToOrient() method to save all the elements of the RDD as OrientDB documents, we can check both querying OrientDB via Studio or querying from the code:

sc.orientQuery("Person").foreach(p => println(s"Person: ${p.getString("surname")}, ${p.getString("name")}"))

 

We can also update the OrientDB documents using the upsertToOrient() method, as shown in this example where we update a document’s property via the RDD and save them back to OrientDB:

peopleRdd
 .filter(person => !person.getString("surname").startsWith("New"))
 .map(person => new Person(person.getString("name"), "New " + person.getString("surname")))
 .upsertToOrient("Person")

 

Orient Graphs to/from Spark GraphX
When we deal with graphs, RDDs are not enough and so we must move to Spark’s API for graph computing: GraphX.

To access OrientDB vertices and edges, we must use the orientGraph() method as shown in this example:

val peopleGraph: Graph[OrientDocument, OrientDocument] = sc.orientGraph()

 

Since peopleGraph is a org.apache.spark.graphx.Graph object, we can use its methods to access OrientDB data, as in these examples:

val people: VertexRDD[OrientDocument] = peopleGraph.vertices
val relationships: EdgeRDD[OrientDocument] = peopleGraph.edges

println(s"The graph contains ${people.count()} vertices and ${relationships.count()} edges.\n")

 

We can also access triplets, as in this example where we print friendships among people:

peopleGraph
 .triplets
 .foreach(triplet => {
   val srcPerson: OrientDocument = triplet.srcAttr
   val dstPerson: OrientDocument = triplet.dstAttr
   println(s"Person: ${srcPerson.getString("surname")}, ${srcPerson.getString("name")} [${triplet.srcId}]. Friend: ${dstPerson.getString("surname")}, ${dstPerson.getString("name")} [${triplet.dstId}]")
 })

 

The built-in graph algorithms supplied by GraphX are also available, like the triangleCount() used here to show the triangles among people:

val triangles = peopleGraph.triangleCount()

// prints how many triangles every vertex participate in
triangles
 .vertices
 .foreach {
   case (vertexId, trianglesNumber) => println(s"Person [${vertexId}] participates in ${trianglesNumber} triangles.")
 }

 

When we have a GraphX graph and we want to save it as an OrientDB graph, we can use the saveGraphToOrient():

val gr: Graph[Person, String] = createSampleGraph(sc)
gr.saveGraphToOrient()

 

In this example, the createSampleGraph() method just creates a simple graph with three vertices and five edges as RDDs and then builds the graph upon them:

def createSampleGraph(sparkContext: SparkContext): Graph[Person, String] = {

 val people: RDD[(VertexId, Person)] =
   sparkContext.parallelize(
     Array(
       (1L, new Person("Alice", "Anderson")),
       (2L, new Person("Bob", "Brown")),
       (3L, new Person("Carol", "Clark"))
     )
   )


 val edges: RDD[Edge[String]] =
   sparkContext.parallelize(
     Array(
       Edge(1L, 2L, "Friendship"),
       Edge(1L, 3L, "Friendship"),
       Edge(2L, 1L, "Friendship"),
       Edge(3L, 1L, "Friendship"),
       Edge(3L, 2L, "Friendship")
     )
   )
 Graph(people, edges)
}

 

This full code of these examples is available on github at https://github.com/andreaiacono/SparkOrientDbConnectorDemo.

This is a guest post by OrientDB contributor Matan Shukry.

Hi,

My name is Matan Shukry, and I’m a Programmer, DBA, and a Big Data engineer.

Today I’ll talk about my contribution to OrientDB, with emphasis on sequences.

The concept of sequences should be familiar to most people who used an RDBMS before. However, for those of you who aren’t familiar with it, I’ll give a short description on the topic.

Sequence is a database object that generates numbers sequentially. It is mostly used for automatically incremented columns.

Sounds simple, right? Well, here comes the tricky part:

    1. Sequences do not necessarily generate numbers in an ordered fashion. Assuming A and B are retrieve operations, where B happens after A, A may result in a number that is higher than B.
    2. Sequences do not necessarily generate numbers in a continuous fashion. Assuming A and B are retrieve operations, where B happens after A, the difference between the result of B and A may be bigger than 1. That is, there may be “holes” between sequence values.

 

Both of the above points happen due to a caching mechanism, where a range of numbers are kept in memory, and when requested are provided to the user. However, in some cases such as a transaction rollback or a server shutdown, the numbers are lost. Furthermore, in many cases there is also an option in the sequence to turn off caching in order to provide a sequence that generates ordered and continuous numbers.

Starting from version 2.2, a sequence object has been introduced to OrientDB. It contains two types (ordered and cached), and include ‘start’ and ‘increment’ fields.

The sequence object in OrientDB uses optimistic transaction (MVCC). When the sequence needs to allocate more numbers (either a range of them with cached sequence or a single one with ordered sequence), it will retrieve the document, change it’s properties, and attempt to save it (commit). If the sequence document is too old, meaning another connection changed the document and committed it in between our retrieve-and-save, the sequence will attempt to retry the operation again. If the operation fails a certain number of retries, an exception is thrown back to the user, after which the user decides what to do next. The entire process happens at the database layer, and it’s very quickly.

Also, an automatically-increment column type (which will rely on the sequence object, together with the default value feature) will probably be added in the near/distant future. As you probably figured out by now, this will result in inserting an automatically-incremented number into a specific column. 

Here are a few examples on how to use sequences. Consider a blog site where we would like each post to have a unique id. We would create the sequence as follows:

SQL

CREATE SEQUENCE postId TYPE CACHED START 101 INCREMENT 2 CACHE 20

Java

OSequence seq = database.getMetadata().getSequenceLibrary().createSequence(“postId”, OSequence.SEQUENCE_TYPE.CACHED, new OSequence.CreateParams().setStart(101).setIncrement(2).setCacheSize(20));

Each time we’ll want to insert a new post, we’ll use .next():

SQL

INSERT INTO Post SET id = sequence(“postId”).next(), title=”BTE – Best Title Ever”, body=”…”

Java (Graph API)

OSequence seq = graphDB.getRawGraph().getMetadata().getSequenceLibrary().getSequence(“postId”);
graphDB.addVertex(“class:Post”,
“id”, seq.next(), “title”, “BTE – Best Title Ever”, “body”, “…”);

Java (Document API)

OSequence seq = database.getMetadata().getSequenceLibrary().getSequence(“postId”); 
ODocument doc = new ODocument(“Post”); 
doc.fields(“id”, seq.next(), “title”, “BTE – Best Title Ever”, “body”, “…”); 
doc.save();

 

You can also change the sequence parameters (alter): 

SQL

ALTER SEQUENCE postId START 1001 INCREMENT 30 CACHE 40

Java

database.getMetadata().getSequenceLibrary().getSequence(“postId”).updateParams(new OSequence.CreateParams().setStart(1001).setIncrement(30).setCache(40));

 

If at some point we would like to retrieve the current value without incrementing it or reset it back to 0 (probably when playing around in your development environment):

SQL

SELECT sequence(“userId”).current()
SELECT sequence(“userId”).reset()

Java

OSequence seq = database.getMetadata().getSequenceLibrary().getSequence(“postId”); 
long value = seq.current(); 
seq.reset();

P.S.

There is a workaround in order to create an auto-increment fields in previous versions of OrientDB (<v2.2). Check out this page for more information.

 

Hope this comes in handy,

Matan Shukry
 _

Introduction

This tutorial explains step-by-step how to create partitioned graphs using the Record Level Security feature introduced in OrientDB 1.2.0. This feature is so powerful we can totally separate database’s records as sand-boxes where each “Restricted” records can’t be accessed by non authorized users. This tutorial demonstrates this sand-boxes works well also with the GraphDB API and the TinkerPop stackPartitioning graphs allows to build real Multi-tenant applications in a breeze.

Requirements:


Index of contents

Create a new empty graph database

First open the console of the GraphDB Edition and create the new database “blog” of type “graph” against the local file-system:
$ cd $ORIENTDB_HOME/bin
$ console
.shOrientDB console v.1.2.0-SNAPSHOT www.orientechnologies.comType 'help' to display all the commands supported.
Installing extensions for GREMLIN language v.2.2.0-SNAPSHOT

orientdb
> create database local:../databases/blog admin admin local graphCreating database [local:../databases/blog] using the storage type [local]...
Database created successfully.
Current database is: local:../databases/blog

Enable graph partitioning

Now turn on partitioning against graph by letting classes V (Vertex) and E (Edge) to extend the ORestricted class. In this way any access to Vertex and Edge instances can be restricted:
orientdb> alter class V superclass ORestrictedClass updated successfully

orientdb
> alter class E superclass ORestricted
Class updated successfully

Create 2 users

Now let’s go creating 2 users: “luca” and “steve”. First ask the current roles in database to know the “writer” role’s rid:
orientdb> select from orole
---+---------+--------------------+--------------------+--------------------+--------------------
 
#| RID     |name                |mode                |rules               |inheritedRole
---+---------+--------------------+--------------------+--------------------+--------------------
 
0|     #4:0|admin               |1                   |{}                  |null
 
1|     #4:1|reader              |0                   |{database=2, database.schema=2, database.cluster.internal=2, database.cluster.orole=2, database.cluster.ouser=2, database.class.*=2, database.cluster.*=2, database.command=2, database.hook.record=2}|null
 
2|     #4:2|writer              |0                   |{database=2, database.schema=7, database.cluster.internal=2, database.cluster.orole=2, database.cluster.ouser=2, database.class.*=15, database.cluster.*=15, database.command=15, database.hook.record=15}|null
---+---------+--------------------+--------------------+--------------------+--------------------
3 item(s) found. Query executed in 0.045 sec(s).
Found it, it’s the #4:2. Not create 2 users with as first role #4:2 (writer):
orientdb> insert into ouser set name = 'luca', status = 'ACTIVE', password = 'luca', roles = [#4:2]
Inserted record 'OUser#5:4{name:luca,password:{SHA-256}D70F47790F689414789EEFF231703429C7F88A10210775906460EDBF38589D90,roles:[1]} v1' in 0,001000 sec(s).

orientdb
> insert into ouser set name = 'steve', status = 'ACTIVE', password = 'steve', roles = [#4:2]
Inserted record 'OUser#5:3{name:steve,password:{SHA-256}F148389D080CFE85952998A8A367E2F7EAF35F2 D72D2599A5B0412FE4094D65C,roles:[1]} v1' in 0,001000 sec(s).

Create a simple graph as user ‘Luca’

Now it’s time to disconnect and reconnect to the blog database using the new “luca” user:
orientdb> disconnect
Disconnecting from the database [blog]...OK

orientdb
> connect local:../databases/blog luca lucaConnecting to database [local:../databases/blog] with user 'luca'...OK
Now create 2 vertices: a Restaurant and a Pizza:
orientdb> create vertex set label = 'food', name = 'Pizza'
Created vertex 'V#9:0{label:food,name:Pizza,_allow:[1]} v0' in 0,001000 sec(s).

orientdb
> create vertex set label = 'restaurant', name = "Dante's Pizza"
Created vertex 'V#9:1{label:restaurant,name:Dante's Pizza,_allow:[1]} v0' in 0,000000 sec(s).
Now connect these 2 vertices with an edge labelled “menu”:
orientdb> create edge from #9:0 to #9:1 set label = 'menu'
Created edge '[E#10:0{out:#9:0,in:#9:1,label:menu,_allow:[1]} v1]' in 0,003000 sec(s).
To check if everything is ok execute a select against vertices:
orientdb> select from v
---+---------+--------------------+--------------------+--------------------+--------------------
 
#| RID     |label               |name                |_allow              |out
---+---------+--------------------+--------------------+--------------------+--------------------
 
0|     #9:0|food                |Pizza               |[1]                 |[1]
 
1|     #9:1|restaurant          |Dante's Pizza       |[1]                 |null                |[1]
---+---------+--------------------+--------------------+--------------------+--------------------+--------------------
2 item(s) found. Query executed in 0.034 sec(s).

Create a simple graph as user ‘Steve’

Now let’s connect to the database using the ‘Steve’ user and check if there are vertices:
orientdb> disconnect
Disconnecting from the database [blog]...OK

orientdb
> connect local:../databases/blog steve steveConnecting to database [local:../databases/blog] with user 'steve'...OK

orientdb
> select from v
0 item(s) found. Query executed in 0.0 sec(s).
Ok, no vertices found. Try to create something:
orientdb> create vertex set label = 'car', name = 'Ferrari Modena'
Created vertex 'V#9:2{label:car,name:Ferrari Modena,_allow:[1]} v0' in 0,000000 sec(s).

orientdb
> create vertex set label = 'driver', name = 'steve'
Created vertex 'V#9:3{label:driver,name:steve,_allow:[1]} v0' in 0,000000 sec(s).

orientdb
> create edge from #9:2 to #9:3 set label = 'drive'
Created edge '[E#10:1{out:#9:2,in:#9:3,label:drive,_allow:[1]} v1]' in 0,002000 sec(s).
Now check the graph just created:
orientdb> select from v
---+---------+--------------------+--------------------+--------------------+--------------------
 
#| RID     |label               |name                |_allow              |out
---+---------+--------------------+--------------------+--------------------+--------------------
 
0|     #9:2|car                 |Ferrari Modena      |[1]                 |[1]
 
1|     #9:3|driver              |steve               |[1]                 |null                |[1]
---+---------+--------------------+--------------------+--------------------+--------------------+--------------------
2 item(s) found. Query< span> executed in 0.034 sec(s).
The “Steve” user doesn’t see the vertices and edges creates by other users!
What happen if we try to connect 2 vertices of different users?
orientdb> create edge from #9:2 to #9:0 set label = 'security-test'

Error: com.orientechnologies.orient.core.exception.OCommandExecutionException: Error on execution of command: OCommandSQL [text=create edge from #9:2 to #9:0 set label = 'security-test']
Error: java.lang.IllegalArgumentException: Source vertex '#9:0' does not exist
The partition is totally isolated and OrientDB thinks the vertex doesn’t exist while it’s present, but invisible to the current user.

TinkerPop Stack

Record Level Security feature is very powerful because acts at low level inside the OrientDB engine. This is why everything works like a charm, even the TinkerPop stack.

Now try to display all the vertices and edges using Gremlin:
orientdb> gremlin g.V
[v[#9:2], v[#9:3]]
Script executed in 0,448000 sec(s).
orientdb
> gremlin g.E

e
[#10:1][#9:2-drive->#9:3]
Script executed in 0,123000 sec(s).

The same is using other technologies that use the TinkerPop Blueprints: TinkerPop RexterTinkerPop PipesTinkerPop FurnaceTinkerPop Frames and ThinkAurelius Faunus.


This tutorial has been published in http://code.google.com/p/orient/wiki/PartitionedGraphs.





Unlock the full potential of your enterprise’s data