MarcoPolo - live_query()
This function subscribes to a live query
Subscribing to Live Queries
When you issue queries using the standard command()
function, what it returns is effectively a snapshot of the records in the state they held when the query was issued. In the event that another client modifies these records, there's no way you'll know unless you reissue the query.
To get around this limitation, OrientDB provides Live Queries. Instead of returning records, these queries return a subscription token. When the records assigned to this token receive an update, OrientDB pushes the changes to the given receiver function.
This function subscribes to live queries. When you're finished with the live query, you can call the live_query_unsubscribe()
function to deregister the token.
Syntax
live_query(<conn>, <query>, <receiver>, <opts>)
<conn>
Defines the database connection.<query>
Defines the query.<receiver>
Defines the function to receive the records<opts>
Defines additional options for the function. For more information, see Options.
Options
This function can take one additional option:
:timeout
Defines the timeout value in milliseconds. In the event that the operation takes longer than the allotted time, MarcoPolo sends an exit signal to the calling process.
Return Value
When this operation is successful, it returns the tuple {:ok, token}
, where the variable is a subscription token, which OrientDB uses to register any changes made to the records the query returns. You can use this token with the live_query_unsubscribe()
function when you're ready to unsubscribe.
In the event that the operation fails, it returns {:error, message}
, where the variable provides the exception message.
Example
Imagine you have an application that handles monitoring for various environmental sensors. Every fifteen minutes your application calls a series of functions to update the OrientDB database. You might use a Live Query to test whether the sensor reading has met an alert condition, causing the application to send emails or text messages to on-call operators. After a given timeout interval, the query resets itself, unsubscribing from the given Live Query.
@doc """ Check Readings for Alert Conditions """
def check_readings(record) do
# Retrieve Data
data = record.fields["reading"]
if data >= threshold do
# Log Alert
IO.puts("Alert Condition: #{data}")
# Call Notification Function
notify_operator(record)
end
end
@doc """ Handler for monitor function check_readings() """
def read_handler(conn, sensor, interval) do
# Log Operation
IO.puts("Initializing #{sensor} Monitor")
# Call Live Query
{:ok, token } -> MarcoPolo.live_query(conn,
"LIVE SELECT FROM Sensors WHERE sensor_name = '#{sensor}'",
check_readings)
# Wait Interval
Process.sleep(interval)
# Unsubscribe and Restart Monitor
MarcoPolo.live_query_unsubscribe(token)
# Recursive Restart
read_handler(conn, sensor, interval)
end