RFC 031: Relation Batcher
Background
The relation_embedder
was recently introduced into the pipeline to denormalise related works into a works data, so that a particular work in an archive retrieved from the API will contain it's children, ancestors and siblings.
Problem
Currently the relation_embedder
is not processing works at a rate that is sufficient, being orders of magnitude slower than other pipeline stages.
There has already been a number of PRs which have attempted to alleviate this by optimising the requests sent to Elasticsearch for indexing, retrieval and querying of relations:
This work is definitely useful (and with #1011 greatly improves pipeline storage indexing performance for other services too), but is unlikely on its own to make a significant enough improvement to throughput, due to more fundamental problems with the work performed by the relation_embedder
.
The main issue is that for a single input to the relation_embedder
, it can result in many works being denormalised. Given the following tree:
When a message is received for work D
it will need denormalise 2 works, namely B
and D
: it needs to denormalise the work itself, and it's parent B
due to it needing to update it's children
field. When a message is received for work A
however it will need to denormalise every work in the tree, due to all of the works containing A
as an ancestor.
When building the relation_embedder
the assumption was made that many more works will be further down the tree and leaf nodes than being near the root, so the amount of work performed will be manageable. However this assumption doesn't really hold, with a combinatorially large amount of work being generated by nodes that are high in the tree of large archives.
There are also extreme cases where even though most of the works in an archive are leaf nodes, the archive is very flat. To take one real world example:
There are 3583 works here which are siblings of each other, and when any one of these is received at the input it will result in all of their siblings and the two ancestors needing to be denormalised too. For an initial reindex that will result in denormalising 6,427,905 works (1 + 2 + ... + 3585
), and in a reharvest where in the worst case all of the works in the archive are sent again it will result in 12,852,225 (3585 * 3585
). In an ideal scenario, we would of course only denormalise each of the works in that archive one time only (i.e. 3,585 denormalisations), with anything above that being unnecessary work. Note this is just a single archive, and there will likely be other similar cases out of the 6349 archives currently in Calm.
Proposed Solution
In order to prevent the large amount of duplicate work performed within the relation_embedder
, we propose adding two more small services to the pipeline before the relation_embedder
: a router
which extracts archive works and passes through non-archive works, and a batcher
which groups works contained within the same archives over a given time period, preventing the relation_embedder
from performing the excessive levels of duplicate work we are currently seeing.
Router
The router
takes a work id, looks it up in the pipeline_storage
and checks if it's part of an archive (i.e. has a collectionPath
field). If it is, it sends the value of collectionPath
to the batcher
, otherwise sending the work straight to the id_minter_works
.
Batcher
The batcher
groups work received over a given time period and works out the optimal set of paths to send towards the relation_embedder
. It is deployed as a single task and runs at intervals of fixed duration (say, 30 minutes), reading all messages from the queue in a single pass.
Assuming a graph shape like this:
With a queue containing the paths for works H
, D
, E
and F
, the batcher
would:
Determine the parent set of each of the messages in the queue (ie. in the example
G
,B
,C
)Filter out nodes which are descendants from other ones (i,e,
G
as it is a descendent ofB
)Send the remaining to the
relation_embedder
(B
andC
)
In the example above of this archive tree:
During a reindex in the best case scenario all works in this archive are read from queue at the same time in the batcher
, and the root work (MS9225
) is sent once to the relation_embedder
. In a more plausible scenario where the works are split across multiple batches, each batch will contain the tree only once, which although it will lead to duplicate work this will still drastically reduce the amount of work performed.
Relation embedder
The relation_embedder
receives messages each containing a collectionPath
as input from the batcher
, instead of work IDs as previously. It is unchanged in every other aspect.
Potential Issues
The
batcher
necessarily introduces some latency into the processing of archive works that is not present in the current pipeline. We are able to tweak the time processing interval for this to provide a compromise betweenrelation_embedder
performance, delay, and costs. One potential solution would be to make the interval duration change dynamically in response to pipeline load, so that it's big while there are many messages in flight, but it's small if there are not many messages, decreasing latency for updates while retaining efficiency during a reindex.Another disadvantage is the proposal introduces 2 new services, and some additional architectural complexity due to the fact that the messages split at the
router
before joining again at theid_minter
. The exact number of services and modes of communication could probably be modified, but the particular design we came up with was the simplest we could think of in regards to not introducing complicated logic and additional storage beyond using queues with SQS.
Last updated