Request For Comments (RFCs)
  • Request for comments (RFC)
  • RFC 001: Matcher architecture
  • RFC 002: Archival Storage Service
  • RFC 003: Asset Access
  • RFC 004: METS Adapter
  • RFC 005: Reporting Pipeline
  • RFC 006: Reindexer architecture
  • RFC 007: Goobi Upload
  • RFC 008: API Filtering
  • RFC 009: AWS account setup
  • RFC 010: Data model
  • RFC 011: Network Architecture
  • RFC 012: API Architecture
  • RFC 013: Release & Deployment tracking
    • Deployment example
    • Version 1
  • RFC 014: Born digital workflow
  • RFC 015: How we work
    • Code Reviews
    • Shared Libraries
  • RFC 016: Holdings service
  • RFC 017: URL Design
  • RFC 018: Pipeline Tracing
  • RFC 019: Platform Reliability
    • CI/CD
    • Observability
    • Reliability
  • RFC 020: Locations and requesting
  • RFC 021: Data science in the pipeline
  • RFC 022: Logging
    • Logging example
  • RFC 023: Images endpoint
  • RFC 024: Library management
  • RFC 025: Tagging our Terraform resources
  • RFC 026: Relevance reporting service
  • RFC 026: Relation Embedder
  • RFC 027: Pipeline Intermediate Storage
  • RFC 029: Work state modelling
  • RFC 030: Pipeline merging
  • RFC 031: Relation Batcher
  • RFC 032: Calm deletion watcher
  • RFC 033: Api internal model versioning
  • RFC 034: Modelling Locations in the Catalogue API
  • RFC 035: Modelling MARC 856 "web linking entry"
  • RFC 036: Modelling holdings records
  • RFC 037: API faceting principles & expectations
  • RFC 038: Matcher versioning
  • RFC 039: Requesting API design
  • RFC 040: TEI Adapter
  • RFC 041: Tracking changes to the Miro data
  • RFC 042: Requesting model
  • RFC 043: Removing deleted records from (re)indexes
  • RFC 044: Tracking Patron Deletions
  • RFC 045: Work relationships in Sierra, part 2
    • Work relationships in Sierra
  • RFC 046: Born Digital in IIIF
  • RFC 047: Changing the structure of the Catalogue API index
  • RFC 048: Concepts work plan
  • RFC 049: Changing how aggregations are retrieved by the Catalogue API
  • RFC 050: Design considerations for the concepts API
  • 051-concepts-adapters
  • RFC 052: The Concepts Pipeline - phase one
  • RFC 053: Logging in Lambdas
  • RFC 054: Authoritative ids with multiple Canonical ids.
  • RFC 055: Genres as Concepts
  • RFC 056: Prismic to Elasticsearch ETL pipeline
  • RFC 058: Relevance testing
    • Examples of rank CLI usage
  • RFC 059: Splitting the catalogue pipeline Terraform
  • RFC 060: Service health-check principles
  • RFC 061: Content API next steps
  • RFC 062: Content API: All search and indexing of addressable content types
  • RFC 062: Wellcome Collection Graph overview and next steps
  • RFC 063: Catalogue Pipeline services from ECS to Lambda
  • RFC 064: Graph data model
  • RFC 065: Library Data Link Explorer
  • RFC 066: Catalogue Graph pipeline
  • RFC 067: Prismic API ID casing
  • RFC 068: Exhibitions in Content API
  • RFC 069: Catalogue Graph Ingestor
  • RFC 070: Concepts API changes
  • RFC 071: Python Building and Deployment
    • The current state
  • RFC 072: Transitive Sierra hierarchies
  • RFC 073: Content API
    • Content API: articles endpoint
    • Content API: Events endpoint
    • Content API: exhibitions endpoint
    • The future of this endpoint
  • RFC 074: Offsite requesting
    • Sierra locations in the Catalogue API
  • RFC 075: Using Apache Iceberg tables in Catalogue Pipeline adapters
Powered by GitBook
On this page
  • Problem statement
  • Prior approaches
  • Proposed solution

RFC 006: Reindexer architecture

This RFC proposes a new architecture for the reindexer, which is responsible for updating records in DynamoDB to trigger events for downstream applications.

Last modified: 2019-01-09T14:56:11+00:00

Problem statement

We store records in tables in DynamoDB. Downstream applications consume an event stream from the table.

If we want to reprocess the records in a table, we need to trigger an event for every row in the table. The only way to trigger an event is to modify a row, so we have a pipeline that edits the rows of a table, called the reindexer.

We add a new field to the rows in our table, called reindexVersion. This is an integer, incremented for every reindex -- the increment is what triggers the new event. The reindexer is the only application which edits this field.

Prior approaches

We've tried a couple of approaches to the reindexer already:

  • A standalone script which can be run locally, e.g. written in Python.

    This is just too slow, as most scripts will only process the table in serial. It also requires duplicating our conditional update logic in another language.

    Any new solution needs to work in parallel, which means it's probably a Scala app.

  • A "reindex_worker" that does most of the work, with Lambdas at the edges. The exact process is as follows:

    1. The table is divided into "reindex shards". Each row is in a single shard, and each shard contains ~1500 records.

    2. The user triggers a reindex with a local script.

    3. The script updates a reindex shard tracker table, which records the desired and current versions. Specifically, it increments the desired version on every record in the table.

    4. A Lambda (the "reindex job creator") gets the event stream from this table, and sends any rows where (current version) < (desired version) to an SQS queue.

      The SQS message is of the form:

      {
          "shardId": "miro/123",
          "desiredVersion": 456
      }
    5. An ECS service (the "reindex worker") reads messages from this SQS queue. It queries a secondary index on the DynamoDB table to find all records in the shard which have a reindexVersion lower than the desiredVersion from the SQS message.

      This returns a list of up to 1500 records in the shard.

    6. It proceeds to update every record in the shard. It has to make an individual PutItem call for each record, as we do conditional updates (locking around the version field) to avoid conflicts.

    7. When it's finished updating the shard, it sends a completion message to an SNS topic.

    8. Another Lambda ("complete_reindex") receives the completion message, and updates the current version in the reindex shard tracker table.

    We've seen issues in step 5 -- making individual updates to the table. If we hit any sort of DynamoDB error (e.g. throughput limit exceeded, conditional update failure), the batch fails and has to be restarted from scratch. In practice, this means a lot of reindex jobs end up on a DLQ, and incomplete.

    Any new solution needs to reduce the number of DynamoDB PutItem calls required to process a single SQS message.

Proposed solution

We split the reindex worker into two tasks:

  1. Querying DynamoDB to find out which records need reindexing

  2. Performing the PutItem call to do the update

The new flow is as follows:

  1. The table is still divided into "reindex shards", of roughly the same size as before.

  2. The user triggers a reindex with a local script, specifying the exact version they want to reindex to.

  3. The script reads the reindex shard tracker table, but only to get a list of shards.

    For every shard in the table, it sends an SQS message of the form:

    {
        "shardId": "miro/123",
        "desiredVersion": 456
    }

    (We do this in a local script rather than a Lambda to avoid hitting the Lambda timeout. The number of shards is typically a few hundred, so it's long enough to risk hitting Lambda limits, but not so long as to be onerous.)

  4. A new ECS service (the "reindex job creator") receives this message. It queries DynamoDB to find all records in the shard which need reindexing, then sends a message to a new SQS queue. This message is of the form:

    {
        "id": "miro/123",
        "desiredVersion": 456
    }
  5. A new ECS service (the "reindex worker") receives this message. It queries DynamoDB to get the current row with this ID, and if it still needs reindexing, it makes the PutItem call to update the row.

    When it's updated the row (or decided it doesn't need updating), it deletes the message from the SQS queue

We can use the reindex worker's DLQ to detect any records which consistently fail to reindex.

PreviousRFC 005: Reporting PipelineNextRFC 007: Goobi Upload

Last updated 10 days ago