MongoDB provides the ability to invoke to retrieve cursors of data that are tailable.
We can exploit this functionality by using on the oplog to provide a trigger-like effect on the Mongo database so that we can respond to changes in real-time.
Using pymongo you can setup a connection to your mongo server’s oplog like so:
tail_opts={'tailable':True,'await_data':True}# connect to the target mongo server
mongo_url='mongodb://localhost:27017'db=MongoClient(mongo_url).local# get the latest timestamp in the database
last_ts=db.oplog.rs.find().sort('$natural',-1)[0]['ts'];whileTrue:# prepare the tail query and kick it off
query={'ts':{'$gt':last_ts}}cursor=db.oplog.rs.find(query,**tail_opts)cursor.add_option(_QUERY_OPTIONS['oplog_replay'])try:whilecursor.alive:try:# grab a document if available
doc=cursor.next()# do something interesting with "doc"
exceptStopIteration:# thrown when the cursor is out of data, so wait
# for a period for some more data
time.sleep(10)finally:cursor.close()
This constant feedback loop will just keep pumping results down the pipe as they’re seen. You can already see that having an oplog setup on your database is a requirement of this solution. Without this, we have no way to measure the transactions that have executed.
The dictionary tail_opts is passed as the second argument to the find call. You can see that there are a couple of flags set here. The first one is tailable. tailable tells mongo that we want new results as they appear in scope of the cursor. await_data is another option that is set on the cursor to get the server to wait for data as it becomes available.
According to 10gen:
The sequence creates a cursor that will wait for few seconds after returning the full result set so that it can capture and return additional data added during the query
I have wrapped this functionality up into a server of its own (and client library) available from my GitHub repo. mutated-mongo takes the idea in this article and filters out only messages that particular clients have subscribed to. It’s still a work in progress.
The MongoDB oplog allows you to keep track of changes that have happened on your database in real-time. This is a very useful tool that isn’t offered out of the box with a single server instance. You can follow these steps to enable to oplog on a standalone MongoDB instance.
Un-comment the following lines from your /etc/mongodb.conf file
replSet=rs0
oplogSize=1024
This will give your MongoDB server a replica set identity of rs0 and will allow your oplog to grow to 1024mb. You can tune these parameters to suit.
To complete the process, restart your MongoDB daemon and open a shell. You just need to issue rs.initiate() on the local database:
michael@mongo:~$ mongo
MongoDB shell version: 2.6.1
connecting to: test
> use local
switched to db local
> rs.initiate()
{
"info2" : "no configuration explicitly specified -- making one",
"me" : "mongo:27017",
"info" : "Config now saved locally. Should come online in about a minute.",
"ok" : 1
}
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
This post is a just a tid-bit for the use of xargs in bash.
If you can get a list of work to do from a file or stream, you can pipe these into xargs to do further work. An example of this, I’ve taken from here. This will find all of the *.bak in or below the current directory and delete them.
find .-name"*.bak"-type f -print | xargs /bin/rm -f
Extending the usage of xargs to incorporate the -i switch, you can replace “{}” with the line of text read in the preprended command.
Concurrent programming is hard. It’s made a lot easier with good tools and MVar is one of them. MVars is just a location for a value. It can contain a value or contain nothing and the API will block accordingly, providing a safe concurrent programming environment for mutable state.
From the Hackage page for Control.Concurrent.MVar:
An MVar t is mutable location that is either empty or contains a value of type t. It has two fundamental operations: putMVar which fills an MVar if it is empty and blocks otherwise, and takeMVar which empties an MVar if it is full and blocks otherwise.
Key Points
newEmptyMVar creates an MVar that has no value to begin with
newMVar creates an MVar that has an initial value
takeMVar returns the current value of the MVar. It’ll block until the MVar contains a value
putMVar puts a value into the MVar. It’ll block until the MVar doesn’t contain a value
An Example
importControl.ConcurrentimportControl.Concurrent.MVarmain::IO()main=do-- create an empty mvarm<-newEmptyMVar-- get another thread to put a value in itforkIO$putMVarm"A value"-- take the valuex<-takeMVarmputStrLnx