Blog Engineering

Share this post on Twitter

Photo of Rob Story

Railyard: how we rapidly train machine learning models with Kubernetes

Rob Story on May 7, 2019 in Engineering

Stripe uses machine learning to respond to our users’ complex, real-world problems. Machine learning powers Radar to block fraud, and Billing to retry failed charges on the network. Stripe serves millions of businesses around the world, and our machine learning infrastructure scores hundreds of millions of predictions across many machine learning models. These models are powered by billions of data points, with hundreds of new models being trained each day. Over time, the volume, quality of data, and number of signals have grown enormously as our models continuously improve in performance.

Running infrastructure at this scale poses a very practical data science and ML problem: how do we give every team the tools they need to train their models without requiring them to operate their own infrastructure? Our teams also need a stable and fast ML pipeline to continuously update and train new models as they respond to a rapidly changing world. To solve this, we built Railyard, an API and job manager for training these models in a scalable and maintainable way. It’s powered by Kubernetes, a platform we’ve been working with since late 2017. Railyard enables our teams to independently train their models on a daily basis with a centrally managed ML service.

In many ways, we’ve built Railyard to mirror our approach to products for Stripe’s users: we want teams to focus on their core work training and developing machine learning models rather than operating infrastructure. In this post, we’ll discuss Railyard and best practices for operating machine learning infrastructure we’ve discovered while building this system.

Effective machine learning infrastructure for organizations

We’ve been running Railyard in production for a year and a half, and our ML teams have converged on it as their common training environment. After training tens of thousands of models on this architecture over that period, here are our biggest takeaways:

  • Build a generic API, not tied to any single machine learning framework. Teams have extended Railyard in ways we did not anticipate. We first focused on classifiers, but teams have since adopted the system for applications such as time series forecasting and word2vec style embeddings.
  • A fully managed Kubernetes cluster reduces operational burden across an organization. Railyard interacts directly with the Kubernetes API (as opposed to a higher level abstraction), but the cluster is operated entirely by another team. We’re able to learn from their domain knowledge to keep the cluster running reliably so we can focus on ML infrastructure.
  • Our Kubernetes cluster gives us great flexibility to scale up and out. We can easily scale our cluster volume when we need to train more models, or quickly add new instance types when we need additional compute resources.
  • Centrally tracking model state and ownership allows us to easily observe and debug training jobs. We’ve moved from asking, “Did you save the output of your job anywhere so we can look at?” to “What’s your job ID? We’ll figure the rest out.” We observe aggregate metrics and track the overall performance of training jobs across the cluster.
  • Building an API for model training enables us to use it everywhere. Teams can call our API from any service, scheduler, or task runner. We now use Railyard to train models using an Airflow task definition as part of a larger graph of data jobs.

The Railyard architecture

In the early days of model training at Stripe, an engineer or data scientist would SSH into an EC2 instance and manually launch a Python process to train a model. This served Stripe’s needs at the time, but had a number of challenges and open questions for our Machine Learning Infrastructure team to address as the company grew:

  • How do we scale model training from ad-hoc Python processes on shared EC2 instances to automatically training hundreds of models a day?
  • How do we build an interface that is generic enough to support multiple training libraries, frameworks, and paradigms while remaining expressive and concise?
  • What metrics and metadata do we want to track for each model run?
  • Where should training jobs be executed?
  • How do we scale different compute resource needs (CPU, GPU, memory) for different model types?

Our goal when designing this system was to enable our data scientists to think less about how their machine learning jobs are run on our infrastructure, and instead focus on their core inquiry. Machine learning workflows typically involve multiple steps that include loading data, training models, serializing models, and persisting evaluation data. Because Stripe runs its infrastructure in the cloud, we can manage these processes behind an API: this reduces cognitive burden for our data science and engineering teams and moves local processes to a collaborative, shared environment. After a year and a half of iteration and collaboration with teams across Stripe, we’ve converged on the following system architecture for Railyard. Here’s a high-level overview:

Railyard runs on a Kubernetes cluster and pairs jobs with the right instance type.

Railyard provides a JSON API and is a Scala service that manages job history, state, and provenance in a Postgres database. Jobs are executed and coordinated using the Kubernetes API, and our Kubernetes cluster provides multiple instance types with different compute resources. The cluster can pair jobs with the right instance type: for example, most jobs default to our high-CPU instances, data-intensive jobs run on high-memory instances, and specialized training jobs like deep learning run on GPU instances.

We package the Python code for model training using Subpar, a Google library that creates a standalone executable that includes all dependencies in one package. This is included in a Docker container, deployed to the AWS Elastic Container Registry, and executed as a Kubernetes job. When Railyard receives an API request, it runs the matching training job and logs are streamed to S3 for inspection. A given job will run through multiple steps, including fetching training and holdout data, training the model, and serializing the trained model and evaluation data to S3. These training results are persisted in Postgres and exposed in the Railyard API.

Railyard’s API design

The Railyard API allows you to specify everything you need to train a machine learning model, including data sources and model parameters. In designing this API we needed to answer the following question: how do we provide a generic interface for multiple training frameworks while remaining expressive and concise for users?

We iterated on a few designs with multiple internal customers to understand each use case. Some teams only needed ad-hoc model training and could simply use SQL to fetch features, while others needed to call an API programmatically hundreds of times a day using features stored in S3. We explored a number of different API concepts, arriving at two extremes on either end of the design spectrum.

On one end, we explored designing a custom DSL to specify the entire training job by encoding scikit-learn components directly in the API itself. Users could include scikit-learn pipeline components in the API specification and would not need to write any Python code themselves.

On the other end of the spectrum we reviewed designs to allow users to write their own Python classes for their training code with clearly defined input and output interfaces. Our library would be responsible for both the necessary inputs to train models (fetching, filtering, and splitting training and test data) and the outputs of the training pipeline (serializing the model, and writing evaluation and label data). The user would otherwise be responsible for writing all training logic.

In the end, any DSL-based approach ended up being too inflexible: it either tied us to a given machine learning framework or required that we continuously update the API to keep pace with changing frameworks or libraries. We converged on the following split: our API exposes fields for changing data sources, data filters, feature names, labels, and training parameters, but the core logic for a given training job lives entirely in Python.

Here’s an example of an API request to the Railyard service:

{
  // What does this model do?
  "model_description": "A model to predict fraud",
  // What is this model called?
  "model_name": "fraud_prediction_model",
  // What team owns this model?
  "owner": "machine-learning-infrastructure",
  // What project is this model for?
  "project": "railyard-api-blog-post", 
  // Which team member is training this model?
  "trainer": "robstory",
  "data": {
    "features": [
      {
        // Columns we’re fetching from Hadoop Parquet files
        "names": ["created_at", "charge_type", "charge_amount",
                  "charge_country", "has_fraud_dispute"],
        // Our data source is S3
        "source": "s3",
        // The path to our Parquet data
        "path": "s3://path/to/parquet/fraud_data.parq"
      }
    ],
    // The canonical date column in our dataset
    "date_column": "created_at",
    // Data can be filtered multiple times
    "filters": [
      // Filter out data before 2018-01-01
      {
        "feature_name": "created_at",
        "predicate": "GtEq",
        "feature_value": {
          "string_val": "2018-01-01"
        }
      },
      // Filter out data after 2019-01-01
      {
        "feature_name": "created_at",
        "predicate": "LtEq",
        "feature_value": {
          "string_val": "2019-01-01"
        }
      },
      // Filter for charges greater than $10.00
      {
        "feature_name": "charge_amount",
        "predicate": "Gt",
        "feature_value": {
          "float_val": 10.00
        }
      },
      // Filter for charges in the US or Canada
      {
        "feature_name": "charge_country",
        "predicate": "IsIn",
        "feature_value": {
          "string_vals": ["US", "CA"] 
        }
      }
    ],
    // We can specify how to treat holdout data
    "holdout_sampling": {
      "sampling_function": "DATE_RANGE",
      // Split holdout data from 2018-10-01 to 2019-01-01
      // into a new dataset
      "date_range_sampling": {
        "date_column": "created_at",
        "start_date": "2018-10-01",
        "end_date": "2019-01-01"
      }
    }
  },
  "train": {
    // The name of the Python workflow we're training
    "workflow_name": "StripeFraudModel",
    // The list of features we're using in our classifier
    "classifier_features": [
      "charge_type", "charge_amount", "charge_country"
    ],
    "label": "is_fraudulent",
    // We can include hyperparameters in our model
    "custom_params": {
      "objective": "reg:linear",
      "max_depth": 6,
      "n_estimators": 500,
      "min_child_weight": 50,
      "learning_rate": 0.02
    }
  }
}

We learned a few lessons while designing this API:

  • Be flexible with model parameters. Providing a free-form custom_params field that accepts any valid JSON was very important for our users. We validate most of the API request, but you can’t anticipate every parameter a machine learning engineer or data scientist needs for all of the model types they want to use. This field is most frequently used to include a model’s hyperparameters.
  • Not providing a DSL was the right choice (for us). Finding the sweet spot for expressiveness in an API for machine learning is difficult, but so far the approach outlined above has worked out well for our users. Many users only need to change dates, data sources, or hyperparameters when retraining. We haven’t gotten any requests to add more DSL-like features to the API itself.

The Python workflow

Stripe uses Python for all ML model training because of its support for many best-in-class ML libraries and frameworks. When the Railyard project started we only had support for scikit-learn, but have since added XGBoost, PyTorch, and FastText. The ML landscape changes very quickly and we needed a design that didn’t pick winners or constrain users to specific libraries. To enable this extensibility, we defined a framework-agnostic workflow that presents an API contract with users: we pass data in, you pass a trained model back out, and we’ll score and serialize the model for you. Here’s what a minimal Python workflow looks like:

class StripeFraudModel(StripeMLWorkflow):
  # A basic model training workflow: all workflows inherit
  # Railyard’s StripeMLWorkflow class
  def train(self, training_dataframe, holdout_dataframe):
    # Construct an estimator using specified hyperparameters
    estimator = xgboost.XGBRegressor(**self.custom_params)

    # Serialize the trained model once training is finished;
    # we're using an in-house serialization library.
    serializable_estimator = stripe_ml.make_serializable(estimator)

    # Train our model
    fitted_model = serializable_estimator.fit(
      training_dataframe[self.classifier_features],
      training_dataframe[self.classifier_label]
    )

    # Hand our fitted model back to Railyard to serialize
    return fitted_model

Teams start adopting Railyard with an API specification and a workflow that defines a train method to train a classifier with the data fetched from the API request. The StripeMLWorkflow interface supports extensive customization to adapt to different training approaches and model types. You can preprocess your data before it gets passed in to the train function, define your own data fetching implementation, specify how you want training/holdout data to be scored, and run any other Python code you need. For example, some of our deep learning models have custom data fetching code to stream batches of training data for model training. When your training job finishes you’ll end up with two output: a model identifier for your serialized model that can be put into production, and your evaluation data in S3.

If you build a machine learning API specification, here are a few things to keep in mind:

  • Interfaces are important. Users will want to load and transform data in ways you didn’t anticipate, train models using unsupported patterns, and write out unfamiliar types of evaluation data. It’s important to provide standard API interfaces like fetch_data, preprocess, train, and write_evaluation_data that specify some standard data containers (e.g., Pandas DataFrame and Torch Dataset) but are flexible in how they are generated and used.
  • Users should not need to think about model serialization or persistence. Reducing their cognitive burden makes their lives easier and gives them more time to be creative and focus on modeling and feature engineering. Data scientists and ML engineers already have enough to think about between feature engineering, modeling, evaluation, and more. They should be able to train and hand over their model to your scoring infrastructure without ever needing to think about how it gets serialized or persisted.
  • Define metrics for each step of the training workflow. Make sure you’re gathering fine-grained metrics for each training step: data loading, model training, model serialization, evaluation data persistence, etc. We store high-level success and failure metrics that can be examined by team, project, or the individual machine performing the training. On a functional level,our team uses these metrics to debug and profile long-running or failed jobs, and provide feedback to the appropriate team when there’s a problem with a given training job. And on a collaborative level, these metrics have changed how our team operates. Moving from a reactive stance (“My model didn’t train, can you help?”) to a proactive one (“Hey, I notice your model didn’t train, here’s what happened”) has helped us be better partners to the many teams we work with.

Scaling Kubernetes

Railyard coordinates hundreds of machine learning jobs across our cluster, so effective resource management across our instances is crucial. The first version of Railyard simply ran individual subprocesses from the Scala service that manages all jobs across our cluster. We would get a request, start Java’s ProcessBuilder, and kick off a subprocess to build a Python virtualenv and train the model. This basic implementation allowed us to quickly iterate on our API in our early days, but managing subprocesses wasn’t going to scale very well. We needed a proper job management system that met a few requirements:

  • Scaling the cluster quickly for different resource/instance types
  • Routing models to specific instances based on their resource needs
  • Job queueing to prioritize resources for pending work

Luckily, our Orchestration team had been working hard to build a reliable Kubernetes cluster and suggested this new cluster would be a good platform for Railyard’s needs. It was a great fit; a fully managed Kubernetes cluster provides all of the pieces we needed to meet our system’s requirements.

Containerizing Railyard

To run Railyard jobs on Kubernetes, we needed a way to reliably package our Python code into a fully executable binary. We use Google’s Subpar library which allows us to package all of our Python requirements and source code into a single .par file for execution. The library also includes support for the Bazel build system out of the box. Over the past few years, Stripe has been moving many of its builds to Bazel; we appreciate its speed, correctness, and flexibility in a multi-language environment.

With Subpar you can define an entrypoint to your Python executable and Bazel will build your .par executable to bundle into a Dockerfile:

par_binary(
    name = "railyard_train",
    srcs = ["@.../ml:railyard_srcs"],
    data = ["@.../ml:railyard_data"],
    main = "@.../ml:railyard/train.py",
    deps = all_requirements,
)

With the Subpar package built, the Kubernetes command only needs to execute it with Python:

command: ["sh"]
args: ["-c", "python /railyard_train.par"]

Within the Dockerfile we package up any other third-party dependencies that we need for model training, such as the CUDA runtime to provide GPU support for our PyTorch models. After our Docker image is built, we deploy it to AWS’s Elastic Container Repository so our Kubernetes cluster can fetch and run the image.

Running diverse workloads

Some machine learning tasks can benefit from a specific instance type with resources optimized for a given workload. For example, a deep learning task may be best suited for a GPU instance while fraud models that employ huge datasets should be paired with high-memory instances. To support these mixed workloads we added a new top-level field to the Railyard API request to specify the compute resource for jobs running on Kubernetes:

{
    "compute_resource": "GPU"
}

Railyard supports training models on CPU, GPU, or memory-optimized instances. Models for our largest datasets can require hundreds of gigabytes of memory to train, while our smaller models can train quickly on smaller (and less expensive) instance types.

Scheduling and distributing jobs

Railyard exerts a fine-grained level of control on how Kubernetes distributes jobs across the cluster. For each request, we look at the requested compute resource and set both a Kubernetes Toleration and an Affinity to specify the type of node that we would like to run on. These parameters effectively tell the Kubernetes cluster:

  • the affinity, or which nodes the job should run on
  • the toleration, or which nodes should be reserved for specific tasks

Kubernetes will use the affinity and toleration properties for a given Kubernetes pod to compute how jobs should be best distributed across or within each node.

Kubernetes supports per-job CPU and memory requirements to ensure that workloads don’t experience resource starvation due to neighboring jobs on the same host. In Railyard, we determine limits for all jobs based on their historic and future expected usage of resources. In the case of high-memory or GPU training jobs, these limits are set so that each job gets an entire node to itself; if all nodes are occupied, then the scheduler will place the job in a queue. Jobs with less intensive resource requirements are scheduled on nodes to run in parallel.

With these parameters in place, we can lean on the Kubernetes resource scheduler to balance our jobs across available nodes. Given a set of job and resource requests, the scheduler will intelligently distribute those jobs to nodes across the cluster.

One year later: running at scale

Moving our training jobs to a Kubernetes cluster has enabled us to rapidly spin up new resources for different models and expand the cluster to support more training jobs. We can use a single command to expand the cluster and new instance types only require a small configuration change. When the memory requirements of running jobs outgrew our CPU-optimized instance types, we started training on memory-optimized instances the very next day; when we observe a backlog of jobs, we can immediately expand the cluster to process the queue. Model training on Kubernetes is available to any data scientist or engineer at Stripe: all that’s needed is a Python workflow and an API request and they can start training models on any resource type in the cluster.

To date, we’ve trained almost 100,000 models on Kubernetes, with new models trained each day. Our fraud models automatically retrain on a regular basis using Railyard and Kubernetes, and we’re steadily moving more of Stripe’s models onto an automated retraining cycle. Radar’s fraud model is built on hundreds of distinct ML models and has a dedicated service that trains and deploys all of those models on a daily cadence. Other models retrain regularly using an Airflow task that uses the Railyard API.

We’ve learned a few key considerations for scaling Kubernetes and effectively managing instances:

  • Instance flexibility is really important. Teams can have very different machine learning workloads. In any given day we might train thousands of time series forecasts, a long-running word embedding model, or a fraud model with hundreds of gigabytes of data. The ability to quickly add new instance types and expand the cluster are equally important for scalability.
  • Managing memory-intensive workflows is hard. Even using various instance sizes and a managed cluster, we still sometimes have jobs that run out of memory and are killed. This is a downside to providing so much flexibility in the Python workflow: modelers are free to write memory-intensive workflows. Kubernetes allows us to proactively kill jobs that are consuming too many resources, but it still results in a failed training job for the modeler. We’re thinking about ways to better manage this, including smart retry behavior to automatically reschedule failed jobs on higher-capacity instances and moving to distributed libraries like dask-ml.
  • Subpar is an excellent solution for packaging Python code. Managing Python dependencies can be tricky, particularly when you’d like to bundle them as an executable that can be shipped to different instances. If we were to build this from scratch again we would probably take a look at Facebook’s XARs, but Subpar is very compatible with Bazel and it’s been running well in production for over a year.
  • Having a good Kubernetes team is a force multiplier. Railyard could not have been a success without the support of our Orchestration team, which manages our Kubernetes cluster and pushes the platform forward for the whole organization. If we had to manage and operate the cluster in addition to building our services, we would have needed more engineers and taken significantly longer to ship.

Building ML infrastructure

We’ve learned that building common machine learning infrastructure enables teams across Stripe to operate independently and focus on their local ML modeling goals. Over the last year we’ve used Railyard to train thousands of models spanning use cases from forecasting to deep learning. This system has enabled us to build rich functionality for model evaluation and design services to optimize hyperparameters for our models at scale.

While there is a wealth of information available on data science and machine learning from the modeling perspective, there isn’t nearly as much published about how companies build and operate their production machine learning infrastructure. Uber, Airbnb, and Lyft have all discussed how their infrastructure operates, and we’re following their lead in introducing the design patterns that have worked for us. We plan to share more lessons from our ML architecture in the months ahead. In the meantime, we’d love to hear from you: please let us know which lessons are most useful and if there are any specific topics about which you’d like to hear more.

Like this post? Join the Stripe engineering team. View openings

You’re viewing our website for Italy, but it looks like you’re in the United States. Switch to the United States site