In the previous post, I covered what the MongoDB oplog is and its semantics. In this post, I will look at how to process it to get the new state of documents.
First, let’s remind ourselves of the data manipulation operations: Insert, Update & Delete. For Inserts and Deletes, only the
o field exists with either the full document or just the
_id being deleted. For Updates,
o field contains the updates as
$unset commands and
o2 notes the
_id of the document being updated.
We can ignore
c (DB Commands) and
n (NOOP) operations as these do not modify the data.
Let us consider a large MongoDB collection containing over 1TB of data which needs to be transferred to a warehouse or lake or another storage system on a daily basis. One method is to perform a full export every day using
mongoexport utility. However, we quickly find that it can take a long time which makes it unfeasible for daily export. We also have to consider the performance impact on the cluster itself.
Another way is to export once, get updates (oplog) for 1 day and apply those to the existing objects. This requires fewer resources on the MongoDB cluster to read the oplog but also allows applying changes at any frequency required.
Keep in mind that the oplog is an absolutely ordered list of changes to the MongoDB cluster. This means the oplog needs to be applied sequentially for each document. This results in gathering all operations by a document, sorting and updating the document. Logically this sounds straightforward.
I’m choosing to solve this with Apache Spark and Python as the data volume requires distributed processing and I’m familiar with Python.
First thing is to read all existing exported documents and oplog.
objs = sc.textFile(self.input()['objects'].path)
ops = sc.textFile(self.input()['oplog'].path)
.map(lambda x: x['message']) #my tailing app writes the oplog as string in this `message` field.
Clean and filter Oplog
In this step, we transform the objects into a tuple with the first element as object ID and second being the oplog entry itself. This will help us join based on a key.
The oplog entries are transformed similarly but since there can be multiple entries per object ID, we use a
groupBy. If you remember that oplog also has system operations for migrating data between shards, we need to exclude those. This happens with a simple filter on
fromMigrate field existing.
objs = objs.map(lambda x: (str(x['_id']), x))
ops = ops.filter(lambda x: "fromMigrate" not in x)
.groupBy(lambda x: str(x['id']))
At this point, both our objects and oplog entries are ready to be processed and merged.
To apply operations we use a custom map function applied to result of a full outer join. The reason we need a full outer join is to include new objects in oplog that don’t exist and also unmodified documents which won’t have any oplog entries. The full join gives us the full data set rather than just modified documents.
final = objs.fullOuterJoin(ops)
.filter(lambda x: x is not None)
.map(lambda x: x)
This map function is responsible for applying the operations. For inserts, it creates a new record. For updates, it traverses both
$unset to manipulate the fields that were changed. For deletes, it removes the documents from the RDD.
filter & maps
The filter and map then change the shape to return an RDD with just the merged documents in their final state.
remove_extra function cleans up documents by removing metadata fields added during
final RDD now contains the resulting dataset with all documents in a consistent state at the end of applying oplog entries. This is the data set for the following day in our case. this processing running on Spark clusters can speed up processing and deliver your data without full export every time.
Nowadays streaming is a big deal so here are a few words on it. This pipeline can be turned into a streaming pipeline. If you make this process as is, it would be possible to run it on micro batches; however, it could take longer to process than the batch interval. An alternative would be to process the oplog entries at regular intervals to collapse/merge those into a final operation which could be applied to objects at longer intervals. I haven’t played with this but depending on the size of the collection, it could work well. For streaming, it might make sense to return on modified documents and ignore any deleted or unmodified documents. this would make the join faster and a smaller set to process.
If you process the MongoDB oplog in a similar way or a different way, I would love to hear your thoughts on the process. I will cover Change Streams in another post hopefully, but let me know if you use that instead.