RFC 069: Catalogue Graph Ingestor
Last updated
Last updated
Following on from the , this RFC outlines the requirements for the Catalogue Graph Ingestor to replace the existing Concepts Pipeline.
Last modified: 2025-02-18T14:31:18+00:00
The Catalogue Graph Ingestor will be responsible for ingesting data from the Graph database into Elasticsearch so that it can be made available via the Catalogue API. The intention is to replace the with the exisiting Catalogue Graph pipeline. The Catalogue Graph Ingestor will replace the role of the in the Concepts Pipeline.
Currently theme/concepts pages are made available via the which is powered by the . We aim to replace the Concepts Pipeline with the Catalogue Graph utilising a graph database in order to enable improved discovery and navigation of the catalogue, this is in alignment with the original goals and design of the Concepts Pipeline.
The catalogue graph will become part of the existing catalogue pipeline and we will aim to replace the existing concepts pipeline and the concepts API with data backed by the catalogue graph.
The Catalogue Graph Ingestor will be responsible for ingesting data from the Graph database into Elasticsearch so that it can be made available via the Catalogue API. The Catalogue Graph Ingestor will be responsible for the following:
Ingesting data from the Graph database into Elasticsearch
Keeping data in Elasticsearch up to date with the graph database and catalogue source data
Providing data that is API compatible with the existing Catalogue API/concepts
endpoints
Aligning with the existing catalogue pipeline to simplify the ingestion process and development effort
Following existing Wellcome Collection standards and best practices including:
New services should avoid using Scala
New CI/CD pipelines should be implemented using GitHub Actions
The Catalogue Graph Ingestor will be implemented as part of the existing Catalogue pipeline as a new "Ingestor" service, alongside the "works" and "images" ingestors. Following the requirement to reduce the use of Scala the ingestor will be implemented in Python. We should ingest concept data into a new index in the existing catalogue pipeline Elasticsearch cluster to align with the catalogue pipeline and reduce the number of clusters we need to maintain.
The Catalogue Graph pipeline is currently orchestrated using AWS Step Functions and the ingestor will be triggered by a new state machine that will update the concepts index in Elasticsearch daily. The ingestor will be responsible for querying the graph database for concept data and ingesting it into Elasticsearch.
Currently the Concepts API provides the following endpoints in use by the Wellcome Collection website:
GET /concepts/{id}
We will not reproduce the sameAs
field, which is catering to situations
where the same concept identifier in the same external ontology as been minted
with multiple identifiers. Instead we will aim to remove all duplicates of this
kind by making it impossible to mint the same concept with multiple identifiers
elsewhere in the pipeline.
The following Cypher query can be used to extract the required data from the graph database:
This will return data in the form:
We can paginate through records by using the SKIP
and LIMIT
clauses in the
query. We should aim to page through all records in the graph database, and then
ingest them into Elasticsearch in batches. We will not do a static scroll
window for this implementation as data will not be updated while we are ingesting
it.
Due to the addition of label derived matching between Concept
andSourceConcept
it is possible for a Concept
to have multiple SourceConcept
nodes. For example where a label derived concept matches the text of a label in
MeSH and LCSH, it may have three SourceConcept
nodes, one for the concept
identified by an explicit reference, and 2 for matching the label text of the
MeSH and LCSH concepts.
The query and result above reflects this, requiring some processing to match
relation and target in the provided result. Each source concept constitutes
another entry in identifiers
the array in the API response, except where
there is a label derived and explicit match, in which case the explicit match
should used.
In addition, we may return SourceLocation
nodes as targets as well asSourceConcept
nodes. These should be treated in the same way as SourceConcept
nodes, and extra location fields ignored initially.
The table describes the mapping of the graph database response to that required for the Elasticsearch index:
id
source.~properties.id
label
source.~properties.label
type
source.~properties.type
sameAs
n/a: Not implemented
alternativeLabels
target.~properties.alternative_labels
identifiers.value
target.~properties.id
identifiers.type
n/a: "Identifier"
identifiers.identifierType.id
target.~properties.source
identifiers.identifierType.label
n/a: Use lookup table?
identifiers.identifierType.type
n/a: "IdentifierType"
There is a very small number of Person concepts in the catalogue with a reference
to viaf or fihrist. These name authorities are currently not in the graph. We
could add these as SourceName
, but this wasn't a priority initially because there
are very few of them and we don't get any onward links or descriptions from the
sources.
However, in the context of reproducing the response from the Concepts API, it means that we currently don't have the source identifiers for these concepts in the graph and may need to add these in.
We may wish to to add them for completeness and in the meantime, we could index viaf and fihrist-derived concepts into the ES index as if they were label-derived, and the corresponding concept pages should look and behave in the same way they do now.
The ingestor will be implemented in Python and will be responsible for querying the graph database for concept data and ingesting it into Elasticsearch.
The ingestor will be triggered by a new state machine that is part of the catalogue pipeline infrastructure.
The ingestor could be split into 2 steps; a loading step from neptune into S3, then an indexing step from S3 into Elastisearch. Splitting the ingestor is desirable for these reasons:
We may want to control the ingestion rate into Elastisearch separately from extracting data from Neptune for ingestion, to avoid overwhelming the Elastisearch cluster when we perform batch updates.
Trialling having services use S3 as an intermediate data-store rather than Elastisearch as a model for other pipeline services (we might have the services read and write data in parquet to S3).
Allows chunkability if we want to further parallelise operations in the future, e.g. split the batch read into blocks of 10,000 records, each is their own file which can be built and operated on independently. This might be useful in the future with Elastisearch serverless as read and write operations are decoupled and scaling up writing might be desirable.
Allows more interruptibility of services, if the ingestion step fails, only that needs to be re-run, rather than extraction and ingestion.
Allows parallelising work on these services in the following sprint, someone can write the extractor while someone else writes the ingestion steps.
An alternative to splitting the ingestor is to have the ingestor read from the graph database and stream directly to Elastisearch. This would be simpler to implement but would not allow for the benefits listed above, and may require more complex error handling, retry logic, and rate limiting to avoid overwhelming the Elastisearch cluster.
As a compromise for simplicity we propose implementing the split ingestor, but without implementing chunking logic for parallelisation. This will allow us to implement the ingestor in a simpler way dealing with the whole dataset in each invocation, and we can add chunking logic in the future if required.
The split ingestor will be implemented as follows:
Concept ingestor state machine: The state machine will;
Be responsible for orchestrating the extraction and ingestion steps.
Will be triggered daily after completion of the catalogue graph pipeline to update the concepts index in Elasticsearch.
Ingestor Loader: The extractor will;
Be responsible for querying the graph database for concept data and writing it to S3.
Be implemented as a Python Lambda function.
Write the data to S3 in parquet format for compaction and efficiency.
Be triggered by the start of new ingestions state machine.
Ingestor Indexer: The ingestor will;
Be responsible for reading the data from S3 and ingesting it into Elasticsearch.
Be implemented as a Python Lambda function.
Be triggered by the new ingestions state machine, on completion of the loader step.
Above we state that it makes sense to put the concept ingestor code alongside the existing catalogue pipeline code.
With the current implementation of the catalogue graph code, placing the ingestor
code in a separate folder will make importing catalogue graph code slightly more
complicated, we'll probably have to modify PYTHONPATH
or similar. It would be
preferable to reuse existing catalogue graph code (e.g. the BaseNeptuneClient
class) in the ingestor, so we will need to cater for this in the setup of the
catalogue pipeline project.
There is further work to be done in order to extract more complex relationships and flatten them on to records ingested into Elasticsearch, this will be covered in future RFCs.
A search endpoint is also provided, following but is not currently in use by the website. We should aim to provide enough data to reproduce the existing in-use API endpoint, and consider providing for the search endpoint out of scope for now.
The current index mapping in the concepts pipeline. The transform from concept object to indexable document is. We should aim to reproduce this data model in the Catalogue Graph Ingestor.
We should implement the ingestor using AWS Lambda initially, and if we discover the process exceeds the 15 minute limit we should consider using an ECS task to process the data in batches. In the first case we should implement the code in a way that allows us to invoke it locally using the command line, which can form a potential basis for the ECS task implementation if required. See the on how to implement this.