Request For Comments (RFCs)
  • Request for comments (RFC)
  • RFC 001: Matcher architecture
  • RFC 002: Archival Storage Service
  • RFC 003: Asset Access
  • RFC 004: METS Adapter
  • RFC 005: Reporting Pipeline
  • RFC 006: Reindexer architecture
  • RFC 007: Goobi Upload
  • RFC 008: API Filtering
  • RFC 009: AWS account setup
  • RFC 010: Data model
  • RFC 011: Network Architecture
  • RFC 012: API Architecture
  • RFC 013: Release & Deployment tracking
    • Deployment example
    • Version 1
  • RFC 014: Born digital workflow
  • RFC 015: How we work
    • Code Reviews
    • Shared Libraries
  • RFC 016: Holdings service
  • URL Design
  • Pipeline Tracing
  • Platform Reliability
    • CI/CD
    • Observability
    • Reliability
  • RFC 020: Locations and requesting
  • RFC 021: Data science in the pipeline
  • RFC 022: Logging
    • Logging example
  • RFC 023: Images endpoint
  • RFC 024: Library management
  • RFC 025: Tagging our Terraform resources
  • RFC 026: Relevance reporting service
  • RFC 026: Relation Embedder
  • RFC 027: Pipeline Intermediate Storage
  • RFC 029: Work state modelling
  • Pipeline merging
  • RFC 031: Relation Batcher
  • RFC 032: Calm deletion watcher
  • RFC 033: Api internal model versioning
  • RFC 034: Modelling Locations in the Catalogue API
  • RFC 035: Modelling MARC 856 "web linking entry"
  • RFC 036: Modelling holdings records
  • API faceting principles & expectations
  • Matcher versioning
  • Requesting API design
  • TEI Adapter
  • Tracking changes to the Miro data
  • How do we tell users how to find stuff?
  • Removing deleted records from (re)indexes
  • RFC 044: Tracking Patron Deletions
  • Work relationships in Sierra, part 2
    • Work relationships in Sierra
  • Born Digital in IIIF
  • Transitive hierarchies in Sierra
  • RFC 047: Changing the structure of the Catalogue API index
  • RFC 048: Concepts work plan
  • RFC 049: Changing how aggregations are retrieved by the Catalogue API
  • RFC 050: Design considerations for the concepts API
  • RFC 051: Ingesting Library of Congress concepts
  • RFC: 052: The Concepts Pipeline - phase one
  • RFC 053: Logging in Lambdas
  • RFC 054: Authoritative ids with multiple Canonical ids.
  • RFC 055: Genres as Concepts
  • RFC 055: Content API
    • Content API: articles endpoint
    • Content API: Events endpoint
    • Content API: exhibitions endpoint
    • The future of this endpoint
  • RFC 056: Prismic to Elasticsearch ETL pipeline
  • RFC 57: Relevance testing
    • Examples of rank CLI usage
  • RFC 059: Splitting the catalogue pipeline Terraform
  • RFC 060: Service health-check principles
  • RFC 060: Offsite requesting
    • Sierra locations in the Catalogue API
  • Content-api: next steps
Powered by GitBook
On this page
  • Problem statement
  • Prior approaches
  • Proposed solution

RFC 006: Reindexer architecture

Last updated: 02 November 2018.

Problem statement

We store records in tables in DynamoDB. Downstream applications consume an event stream from the table.

If we want to reprocess the records in a table, we need to trigger an event for every row in the table. The only way to trigger an event is to modify a row, so we have a pipeline that edits the rows of a table, called the reindexer.

We add a new field to the rows in our table, called reindexVersion. This is an integer, incremented for every reindex -- the increment is what triggers the new event. The reindexer is the only application which edits this field.

Prior approaches

We've tried a couple of approaches to the reindexer already:

  • A standalone script which can be run locally, e.g. written in Python.

    This is just too slow, as most scripts will only process the table in serial. It also requires duplicating our conditional update logic in another language.

    Any new solution needs to work in parallel, which means it's probably a Scala app.

  • A "reindex_worker" that does most of the work, with Lambdas at the edges. The exact process is as follows:

    1. The table is divided into "reindex shards". Each row is in a single shard, and each shard contains ~1500 records.

    2. The user triggers a reindex with a local script.

    3. The script updates a reindex shard tracker table, which records the desired and current versions. Specifically, it increments the desired version on every record in the table.

    4. A Lambda (the "reindex job creator") gets the event stream from this table, and sends any rows where (current version) < (desired version) to an SQS queue.

      The SQS message is of the form:

      {
          "shardId": "miro/123",
          "desiredVersion": 456
      }
    5. An ECS service (the "reindex worker") reads messages from this SQS queue. It queries a secondary index on the DynamoDB table to find all records in the shard which have a reindexVersion lower than the desiredVersion from the SQS message.

      This returns a list of up to 1500 records in the shard.

    6. It proceeds to update every record in the shard. It has to make an individual PutItem call for each record, as we do conditional updates (locking around the version field) to avoid conflicts.

    7. When it's finished updating the shard, it sends a completion message to an SNS topic.

    8. Another Lambda ("complete_reindex") receives the completion message, and updates the current version in the reindex shard tracker table.

    We've seen issues in step 5 -- making individual updates to the table. If we hit any sort of DynamoDB error (e.g. throughput limit exceeded, conditional update failure), the batch fails and has to be restarted from scratch. In practice, this means a lot of reindex jobs end up on a DLQ, and incomplete.

    Any new solution needs to reduce the number of DynamoDB PutItem calls required to process a single SQS message.

Proposed solution

We split the reindex worker into two tasks:

  1. Querying DynamoDB to find out which records need reindexing

  2. Performing the PutItem call to do the update

The new flow is as follows:

  1. The table is still divided into "reindex shards", of roughly the same size as before.

  2. The user triggers a reindex with a local script, specifying the exact version they want to reindex to.

  3. The script reads the reindex shard tracker table, but only to get a list of shards.

    For every shard in the table, it sends an SQS message of the form:

    {
        "shardId": "miro/123",
        "desiredVersion": 456
    }

    (We do this in a local script rather than a Lambda to avoid hitting the Lambda timeout. The number of shards is typically a few hundred, so it's long enough to risk hitting Lambda limits, but not so long as to be onerous.)

  4. A new ECS service (the "reindex job creator") receives this message. It queries DynamoDB to find all records in the shard which need reindexing, then sends a message to a new SQS queue. This message is of the form:

    {
        "id": "miro/123",
        "desiredVersion": 456
    }
  5. A new ECS service (the "reindex worker") receives this message. It queries DynamoDB to get the current row with this ID, and if it still needs reindexing, it makes the PutItem call to update the row.

    When it's updated the row (or decided it doesn't need updating), it deletes the message from the SQS queue

We can use the reindex worker's DLQ to detect any records which consistently fail to reindex.

PreviousRFC 005: Reporting PipelineNextRFC 007: Goobi Upload

Last updated 10 months ago