Cogs and Levers A blog full of technical stuff

Using tailable cursors on the MongoDB oplog for realtime changes

MongoDB provides the ability to invoke to retrieve cursors of data that are tailable.

We can exploit this functionality by using on the oplog to provide a trigger-like effect on the Mongo database so that we can respond to changes in real-time.

Using pymongo you can setup a connection to your mongo server’s oplog like so:

tail_opts = { 'tailable': True, 'await_data': True }

# connect to the target mongo server
mongo_url = 'mongodb://localhost:27017'
db = MongoClient(mongo_url).local

# get the latest timestamp in the database
last_ts = db.oplog.rs.find().sort('$natural', -1)[0]['ts'];

while True:
  # prepare the tail query and kick it off
  query = { 'ts': { '$gt': last_ts } }
  cursor = db.oplog.rs.find(query, **tail_opts)
  cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

  try:
     while cursor.alive:
        try:
           # grab a document if available
           doc = cursor.next()
           
           # do something interesting with "doc"

        except StopIteration:
           # thrown when the cursor is out of data, so wait
           # for a period for some more data
           time.sleep(10)
  finally:
     cursor.close()

This constant feedback loop will just keep pumping results down the pipe as they’re seen. You can already see that having an oplog setup on your database is a requirement of this solution. Without this, we have no way to measure the transactions that have executed.

The dictionary tail_opts is passed as the second argument to the find call. You can see that there are a couple of flags set here. The first one is tailable. tailable tells mongo that we want new results as they appear in scope of the cursor. await_data is another option that is set on the cursor to get the server to wait for data as it becomes available.

According to 10gen:

The sequence creates a cursor that will wait for few seconds after returning the full result set so that it can capture and return additional data added during the query

I have wrapped this functionality up into a server of its own (and client library) available from my GitHub repo. mutated-mongo takes the idea in this article and filters out only messages that particular clients have subscribed to. It’s still a work in progress.

How to setup an oplog on a single MongoDB instance

The MongoDB oplog allows you to keep track of changes that have happened on your database in real-time. This is a very useful tool that isn’t offered out of the box with a single server instance. You can follow these steps to enable to oplog on a standalone MongoDB instance.

Un-comment the following lines from your /etc/mongodb.conf file

replSet=rs0
oplogSize=1024

This will give your MongoDB server a replica set identity of rs0 and will allow your oplog to grow to 1024mb. You can tune these parameters to suit.

To complete the process, restart your MongoDB daemon and open a shell. You just need to issue rs.initiate() on the local database:

michael@mongo:~$ mongo
MongoDB shell version: 2.6.1
connecting to: test
> use local
switched to db local
> rs.initiate()
{
   "info2" : "no configuration explicitly specified -- making one",
   "me" : "mongo:27017",
   "info" : "Config now saved locally.  Should come online in about a minute.",
      "ok" : 1
   }
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset

You now have the oplog available to you.

Assembly Syntax Intel & AT&T

This post is just a little cheat sheet for myself on Intel & AT&T syntax.

A useful table mapping some simple instructions between the two syntaxes linked through from the GCC-Inline-Assembly-HOWTO:

Intel Code AT&T Code
mov eax,1 movl $1,%eax
mov ebx,0ffh movl $0xff,%ebx
int 80h int $0x80
mov ebx, eax movl %eax, %ebx
mov eax,[ecx] movl (%ecx),%eax
mov eax,[ebx+3] movl 3(%ebx),%eax
mov eax,[ebx+20h] movl 0x20(%ebx),%eax
add eax,[ebx+ecx*2h] addl (%ebx,%ecx,0x2),%eax
lea eax,[ebx+ecx] leal (%ebx,%ecx),%eax
sub eax,[ebx+ecx*4h-20h] subl -0x20(%ebx,%ecx,0x4),%eax

Some important points to note:

  • Source and destinations are flipped in opcodes.
    • Intel is dest, src
    • AT&T is src, dest
  • AT&T decorates registers and immediates
    • Registers are prefixed with a “%”
    • Immediates are prefixed with a “$”. This applies to variables being passed in from C (when you’re inline).
  • Intel decorates memory operands to denote the operand’s size, AT&T uses different mnemonics to accomplish the same.
  • Intel syntax to dereference a memory location is “[ ]”. AT&T uses “( )”.

xargs -i

This post is a just a tid-bit for the use of xargs in bash.

If you can get a list of work to do from a file or stream, you can pipe these into xargs to do further work. An example of this, I’ve taken from here. This will find all of the *.bak in or below the current directory and delete them.

find . -name "*.bak" -type f -print | xargs /bin/rm -f

Extending the usage of xargs to incorporate the -i switch, you can replace “{}” with the line of text read in the preprended command.

cat url_endings | xargs -i wget "http://somewhere.com"

In this command, it’s expected that the file “url_endings” would be data that looks like this:

/files/first
/files/second
/files/third
etc . . 

A Quick Lap with MVar

Introduction

Concurrent programming is hard. It’s made a lot easier with good tools and MVar is one of them. MVars is just a location for a value. It can contain a value or contain nothing and the API will block accordingly, providing a safe concurrent programming environment for mutable state.

From the Hackage page for Control.Concurrent.MVar:

An MVar t is mutable location that is either empty or contains a value of type t. It has two fundamental operations: putMVar which fills an MVar if it is empty and blocks otherwise, and takeMVar which empties an MVar if it is full and blocks otherwise.

Key Points

  • newEmptyMVar creates an MVar that has no value to begin with
  • newMVar creates an MVar that has an initial value
  • takeMVar returns the current value of the MVar. It’ll block until the MVar contains a value
  • putMVar puts a value into the MVar. It’ll block until the MVar doesn’t contain a value

An Example

import Control.Concurrent
import Control.Concurrent.MVar
 
main :: IO ()
main = do
	-- create an empty mvar
	m <- newEmptyMVar
	-- get another thread to put a value in it
	forkIO $ putMVar m "A value"
	-- take the value
	x <- takeMVar m
	putStrLn x