Request For Comments (RFCs)
  • Request for comments (RFC)
  • RFC 001: Matcher architecture
  • RFC 002: Archival Storage Service
  • RFC 003: Asset Access
  • RFC 004: METS Adapter
  • RFC 005: Reporting Pipeline
  • RFC 006: Reindexer architecture
  • RFC 007: Goobi Upload
  • RFC 008: API Filtering
  • RFC 009: AWS account setup
  • RFC 010: Data model
  • RFC 011: Network Architecture
  • RFC 012: API Architecture
  • RFC 013: Release & Deployment tracking
    • Deployment example
    • Version 1
  • RFC 014: Born digital workflow
  • RFC 015: How we work
    • Code Reviews
    • Shared Libraries
  • RFC 016: Holdings service
  • RFC 017: URL Design
  • RFC 018: Pipeline Tracing
  • RFC 019: Platform Reliability
    • CI/CD
    • Observability
    • Reliability
  • RFC 020: Locations and requesting
  • RFC 021: Data science in the pipeline
  • RFC 022: Logging
    • Logging example
  • RFC 023: Images endpoint
  • RFC 024: Library management
  • RFC 025: Tagging our Terraform resources
  • RFC 026: Relevance reporting service
  • RFC 026: Relation Embedder
  • RFC 027: Pipeline Intermediate Storage
  • RFC 029: Work state modelling
  • RFC 030: Pipeline merging
  • RFC 031: Relation Batcher
  • RFC 032: Calm deletion watcher
  • RFC 033: Api internal model versioning
  • RFC 034: Modelling Locations in the Catalogue API
  • RFC 035: Modelling MARC 856 "web linking entry"
  • RFC 036: Modelling holdings records
  • RFC 037: API faceting principles & expectations
  • RFC 038: Matcher versioning
  • RFC 039: Requesting API design
  • RFC 040: TEI Adapter
  • RFC 041: Tracking changes to the Miro data
  • RFC 042: Requesting model
  • RFC 043: Removing deleted records from (re)indexes
  • RFC 044: Tracking Patron Deletions
  • RFC 045: Work relationships in Sierra, part 2
    • Work relationships in Sierra
  • RFC 046: Born Digital in IIIF
  • RFC 047: Changing the structure of the Catalogue API index
  • RFC 048: Concepts work plan
  • RFC 049: Changing how aggregations are retrieved by the Catalogue API
  • RFC 050: Design considerations for the concepts API
  • 051-concepts-adapters
  • RFC 052: The Concepts Pipeline - phase one
  • RFC 053: Logging in Lambdas
  • RFC 054: Authoritative ids with multiple Canonical ids.
  • RFC 055: Genres as Concepts
  • RFC 056: Prismic to Elasticsearch ETL pipeline
  • RFC 058: Relevance testing
    • Examples of rank CLI usage
  • RFC 059: Splitting the catalogue pipeline Terraform
  • RFC 060: Service health-check principles
  • RFC 061: Content API next steps
  • RFC 062: Content API: All search and indexing of addressable content types
  • RFC 062: Wellcome Collection Graph overview and next steps
  • RFC 063: Catalogue Pipeline services from ECS to Lambda
  • RFC 064: Graph data model
  • RFC 065: Library Data Link Explorer
  • RFC 066: Catalogue Graph pipeline
  • RFC 067: Prismic API ID casing
  • RFC 068: Exhibitions in Content API
  • RFC 069: Catalogue Graph Ingestor
  • RFC 070: Concepts API changes
  • RFC 071: Python Building and Deployment
    • The current state
  • RFC 072: Transitive Sierra hierarchies
  • RFC 073: Content API
    • Content API: articles endpoint
    • Content API: Events endpoint
    • Content API: exhibitions endpoint
    • The future of this endpoint
  • RFC 074: Offsite requesting
    • Sierra locations in the Catalogue API
Powered by GitBook
On this page
  • Context
  • Suggested solution
  • Transition from ECS to Lambda
  • Potential issues
  • Concurrency
  • Batching
  • Execution time
  • Future improvements

RFC 063: Catalogue Pipeline services from ECS to Lambda

PreviousRFC 062: Wellcome Collection Graph overview and next stepsNextRFC 064: Graph data model

Last updated 3 days ago

Discuss the potential benefits and challenges of moving the catalogue pipeline services from AWS Elastic Container Service (ECS) to AWS Lambda.

Last modified: 2024-10-25T10:16:40+01:00

Context

Catalogue-pipeline services run on . The infrastructure includes scaling to account for resource needs during a reindex (scale up) or day-to-day running of the pipeline (scale down). Scaling is achieved using ECS auto scaling rules driven from CloudWatch metrics on queue depth. Other newer projects leverage AWS Lambda to run similar tasks, and there is an opportunity to remove complexity in the catalogue pipeline and align with newer projects.

The following points cover in more detail the reasoning behind moving from ECS based compute to AWS Lamdba:

  • Reduction in infra complexity by removing ECS auto-scaling logic:

    Scaling the pipeline up or down is achieved in several ways:

    • A manual step is performed to terraform the pipeline into "reindexing_state" by setting a bunch of variables to true . Search for reindexing_state in the catalogue-pipeline repo to find all the ways these variables are used.

    • Every service also has a high/low message count alarm on its input queue. By services scale up to 12 instances as long as there are messages on the queue, and will scale down when the queue is clear. However some services (eg. ingestors) have specially configured max_capacity usually to account for data stores' read/write limits.

    This change aims to reduce the need for this scaling logic, by moving more of complexity into a managed service (AWS Lambda).

  • Reduction in app complexity by removing akka/pekko batching:

    Several (SQS driven) pipeline services implement “SQSStream” (see https://github.com/wellcomecollection/scala-libs/blob/main/messaging/src/main/scala/weco/messaging/sqs/SQSStream.scala ) that batches SQS messages so that they can be processed in bulk. This logic lives inside the service and makes it difficult (impossible?) to run locally.

    This model means the messaging and business/processing logic part of the services are entangled in sometimes confusing ways, and we're losing queue management features of AWS SQS (eg. having to keep track of success and failure to delete or retry messaging accordingly).

    This change is also an opportunity to reassess and possibly reduce incidental complexity as we update the services to use AWS Lambda.

  • Potential reductions in cost:

    ECS services are configured to scale down to 0 when the input is low, which means we are not paying for continously-running tasks, but this results in startinf and stopping tasks repeatedly as messages trickle down the pipeline.

  • Potential increase in speed (deployment and processing):

    ECS services take a few minutes to start tasks as the auto-scaling rules rely on reported CloudWatch metrics, whereas Lambda invocations are . Using may further increase processing speed, but is likely not necessary, at least for most services.

    At present service deployment relies on the ECS deployment APIs which take some time to determine that a service is "stable"; there is no such process for AWS Lambda and deployments are almost instant.

    Another potential speed improvement is that we currently try to compromise between normal running and reindexing with the way we gather messages. Queue polling logic in our services intended to process messages in large batches means that we see delays during normal running, and towards the end of a reindex where the batch threshold is not met.

  • Align with concepts pipeline deployment:

    The concepts-pipeline was designed and built around lambdas. It packages 2 different versions of the aggregator and recorder services, one for each use case for the pipeline: bulk, for complete concepts reindex and SQS, which handles low-volume works updates published to <live_catalogue_pipeline>_ingestor_works_output SNS topic. The ingestor service only runs when needed, eg. when we want to refresh the authoritative-concepts index.

    This allows for most efficient use of the infrastructure/resources and avoids one of the oft-cited complexity/downside in the catalogue-pipeline, namely that the application tries to handle two quite different use cases.

    The different versions of the aggregator and recorder are also configured for their purpose, thus making the infrastructure code easier to read and understand.

Suggested solution

We could incrementally refactor the catalogue-pipeline services to use lambdas instead of ECS. Some considerations are:

  • We currently build container images to package our code for deployment, we probably will continue doing so for the lambdas This will allow us to keep the same build and deployment process, and would allow us to run the services locally using a lambda runtime emulator.

  • Lambda has built-in scaling capabilities which will allow us to simplify the infrastructure by removing scaling configurations and Cloudwatch alarms on the queues. We would still use the reindexing_state to adjust data store provisioning. We could also leverage SQS lambda features to protect the data stores (eg. elasticsearch index at the ingestor stage) from overloading by setting a maximum concurrency on the event source.

  • Lambda is generally considered to be cost-effective for workloads with intermittent or unpredictable traffic patterns (1ms for 1024MB: $0.0000000167).

    • We may be able to do away with Akka/Pekko in some places, which would reduce both the package size and the internal startup time.

  • The concepts-pipeline has demonstrated the value of using AWS Lambdas in a data pipeline.

Transition from ECS to Lambda

In terms of transition from ECS to lambdas to prevent downtime and allow a smooth transition:

  • Update deployment to allow deployment of existing container images to both ECS & Lambda runtime environments.

    • Lambdas can have their own queue subscribed to upstream service topics

    • Where services are stateful we will need to make decisions about whether we want shared or duplicated state between ECS and Lambda services. This may give us the opportunity to change data store to be more suited to the Lambda execution model (e.g. S3 over Elasticsearch).

  • Move services over one at a time, testing that a new Lambda service works by dual-running it alongside its ECS counterpart without publishing messages downstream.

  • When satisfied AWS Lambda behaviour for a service has reached parity we can remove the ECS service and infrastructure.

Potential issues

AWS Lambda has a different execution model than ECS, with SQS messages causing Lambda invocations triggered by an AWS Lambda Trigger (managed invocations) and configuration options to dictate how many messages a Lambda invocation handles, how many Lambdas can run at once, and how long any single Lambda can run for. In ECS, tasks are launched by autoscaling rules, those tasks then poll SQS for messages processing and deleting them as they are received. In ECS when there is no more work to do tasks are terminated based on autoscaling rules.

Potential issues may arise around this change in execution model, specifically:

Concurrency

As we are moving from one model to another, we will need to calibrate concurrency for each service by estimation and experiment.

Concurrency is of particular concern where we are making network requests to external services that may not be configured to meet demand, for example Elasticsearch or RDS databases.

Batching

Batch size is dictated by a combination of:

Along with concurrency batching dictates how much work is processed at one time.

Some specific concerns around batch size:

The maximum number of records in each batch that Lambda pulls from your stream or queue and sends to your function. Lambda passes all of the records in the batch to the function in a single call, up to the payload limit for synchronous invocation (6 MB). The maximum batchsize for lambda is 10 000. This is far less than the batch size that the batcher is currently able to process (up to 120000 collectionPath). This could mean fewer nodes are being matched in a batch, reducing the beneficial effect of the batcher on the relation_embedder load. As of 2024-10-23 there are 271791 documents with a collectionPath in the merged index, ie. as many messages that the batcher needs to process as part of a full reindex

FIFO queues allow Content Based Deduplication, which could be an improvement on the current setup because of the explosion of messages downstream of the relation embedder, but if they can't provide enough messages for efficient bulk updates in Elasticsearch, then we won't be able to take advantage of that feature.

When deciding on batching configuration, maximum execution time must be considered in order that we do not attempt to process work that exceeds that limit.

While running on ECS batch size was less of a concern as there is no upper execution time, and messages are received by polling SQS for more messages while a task is executing.

Execution time

We must ensure that in the usual case we do not attempt to perform work that will take longer than the maximum execution time in a single invocation. The amount of work a function needs to perform should be tunable by changing batching configuration to limit the number of messages processed in an invocation. If longer execution time is needed, computation should be split into further distinct steps handled by other Lambdas, or by services using a different paradigm.

It is very unlikely that a single message in the current implementation should ever take longer than 15 minutes to process.

Future improvements

Some suggestions for future improvements that could be looked at as part of this project, more discussion may warrant further RFCs:

  • Can we avoid using the router service in future by moving its functionality into another service? Could the relation_embedder itself decide which works actually need their relations embedded (ie. works that have a collection path)?

  • Can we simplify batcher service processing logic?

  • Given that the path_concatenator only processes Sierra record trees, would it be better for this to happen earlier in the pipeline? Could be just after the Sierra transformer business, which removes the need for the router to route the relevant works to have their paths concatenated.

  • If we were to wait at the matcher/merger stage for every work to be processed, could we then do without the batcher? We would be able to scan the store populated by the matcher/merger and send all members of a tree together to have their relations embedded, removing the need for a mechanism that limits the “explosion” we have now that is caused by only sending partial trees to the relation_embedder.

  • Should we look into SNS features that allows conditional subscription based on the message body as a replacement for the router?

  • Can we consider replacing intermediate indices with S3? We don't search them; they are acting as a simple JSON stores.

We can leverage SQS event source and lambda features to batch up to 10000 messages before invoking the function (useful in the context of the batcher service). This will make it possible to remove the batching mechanism that exists inside the services, making it easier to run them locally using a .

Error handling could use .

Processing speed could be slowed down by but this can be addressed in various ways:

Use of to keep functions warm.

Sharing data between Lambda invocations, by / optimising .

With ECS concurrency can be limited by the number of running tasks which provides a rough restriction on the number of messages processed at once. With AWS Lambda concurrency can be specified exactly with , and in combination with can tightly control the number of messages a Lambda processes.

Batching describes the number of messages handed to a Lambda to process in one invocation. For the catalogue pipeline we will be invoking Lambdas from non-FIFO SQS messages, and this has .

Configured maximum batch size, up to

Maximum payload size, up to

Length of batch window, (how long to wait before passing on a batch), .

AWS Lambda has a , which cannot be extended. If a function execution exceeded the maximum execution time it will be terminated.

Can we remove the use of in some pipeline services? The logic can be complex and difficult to understand.

Elastic Container Service (ECS)
here
default
much faster
provisioned concurrency
Deploy Java Lambda functions with container images
Optimizing Lambda functions packaged as container images
lambda runtime emulator
Partial batch responses
AWS Lambda cold starts
provisioned concurrency
taking advantage of persistance in the execution environment
static initialisation
"reserved concurrency"
batch size
specific restrictions around batch size
a maximum of 10,000 for non-FIFO queues
128kb for asynchronous invocation
up to 5 minutes
maximum execution time of 15 minute
Pekko Streams