RFC 031: Relation Batcher

Background

The relation_embedder was recently introduced into the pipeline to denormalise related works into a works data, so that a particular work in an archive retrieved from the API will contain it's children, ancestors and siblings.

Problem

Currently the relation_embedder is not processing works at a rate that is sufficient, being orders of magnitude slower than other pipeline stages.

There has already been a number of PRs which have attempted to alleviate this by optimising the requests sent to Elasticsearch for indexing, retrieval and querying of relations:

This work is definitely useful (and with #1011 greatly improves pipeline storage indexing performance for other services too), but is unlikely on its own to make a significant enough improvement to throughput, due to more fundamental problems with the work performed by the relation_embedder.

The main issue is that for a single input to the relation_embedder, it can result in many works being denormalised. Given the following tree:

A
|-----
|    |
B    C
|    |-----
|    |    |
D    E    F

When a message is received for work D it will need denormalise 2 works, namely B and D: it needs to denormalise the work itself, and it's parent B due to it needing to update it's children field. When a message is received for work A however it will need to denormalise every work in the tree, due to all of the works containing A as an ancestor.

When building the relation_embedder the assumption was made that many more works will be further down the tree and leaf nodes than being near the root, so the amount of work performed will be manageable. However this assumption doesn't really hold, with a combinatorially large amount of work being generated by nodes that are high in the tree of large archives.

There are also extreme cases where even though most of the works in an archive are leaf nodes, the archive is very flat. To take one real world example:

MS9225
|
|---
|  |
1  2
   |-------------
   |  |  |      |
   1  2  3 ... 3583

There are 3583 works here which are siblings of each other, and when any one of these is received at the input it will result in all of their siblings and the two ancestors needing to be denormalised too. For an initial reindex that will result in denormalising 6,427,905 works (1 + 2 + ... + 3585), and in a reharvest where in the worst case all of the works in the archive are sent again it will result in 12,852,225 (3585 * 3585). In an ideal scenario, we would of course only denormalise each of the works in that archive one time only (i.e. 3,585 denormalisations), with anything above that being unnecessary work. Note this is just a single archive, and there will likely be other similar cases out of the 6349 archives currently in Calm.

Proposed Solution

In order to prevent the large amount of duplicate work performed within the relation_embedder, we propose adding two more small services to the pipeline before the relation_embedder: a router which extracts archive works and passes through non-archive works, and a batcher which groups works contained within the same archives over a given time period, preventing the relation_embedder from performing the excessive levels of duplicate work we are currently seeing.

Router

The router takes a work id, looks it up in the pipeline_storage and checks if it's part of an archive (i.e. has a collectionPath field). If it is, it sends the value of collectionPath to the batcher, otherwise sending the work straight to the id_minter_works.

Batcher

The batcher groups work received over a given time period and works out the optimal set of paths to send towards the relation_embedder. It is deployed as a single task and runs at intervals of fixed duration (say, 30 minutes), reading all messages from the queue in a single pass.

Assuming a graph shape like this:

A
|-----
|    |
B    C
|    |-----
|    |    |
D    E    F
|
G
|
H

With a queue containing the paths for works H, D, E and F, the batcher would:

  • Determine the parent set of each of the messages in the queue (ie. in the example G, B, C)

  • Filter out nodes which are descendants from other ones (i,e, G as it is a descendent of B)

  • Send the remaining to the relation_embedder (B and C )

In the example above of this archive tree:

MS9225
|
|---
|  |
1  2
   |-------------
   |  |  |      |
   1  2  3 ... 3583

During a reindex in the best case scenario all works in this archive are read from queue at the same time in the batcher, and the root work (MS9225) is sent once to the relation_embedder. In a more plausible scenario where the works are split across multiple batches, each batch will contain the tree only once, which although it will lead to duplicate work this will still drastically reduce the amount of work performed.

Relation embedder

The relation_embedder receives messages each containing a collectionPath as input from the batcher, instead of work IDs as previously. It is unchanged in every other aspect.

Potential Issues

  • The batcher necessarily introduces some latency into the processing of archive works that is not present in the current pipeline. We are able to tweak the time processing interval for this to provide a compromise between relation_embedder performance, delay, and costs. One potential solution would be to make the interval duration change dynamically in response to pipeline load, so that it's big while there are many messages in flight, but it's small if there are not many messages, decreasing latency for updates while retaining efficiency during a reindex.

  • Another disadvantage is the proposal introduces 2 new services, and some additional architectural complexity due to the fact that the messages split at the router before joining again at the id_minter. The exact number of services and modes of communication could probably be modified, but the particular design we came up with was the simplest we could think of in regards to not introducing complicated logic and additional storage beyond using queues with SQS.

Last updated