Implementing an end-to-end ML system using batch-serving architecture
Summary
In this blog post, I will present the development process of an end-to-end machine learning (ML) platform designed to accommodate a batch-serving architecture. This initiative is part of my 2023 goal plan which aims to expand my engineering capabilities into the realm of AI/ML deployments. It draws inspiration and insights from Paul Iusztin’s comprehensive Full Stack MLOps Guide. Rather than merely duplicating his project, I elevated the endeavor by incorporating a distinct dataset. Capitalizing on the geographical context of Singapore, I utilized the Open Government Application Programming Interface (API) to extract PM2.5 data. Consequently, although the infrastructure stack and logic align closely with the reference guide, notable distinctions arise in the components responsible for preprocessing, prediction, and inference. The source code can located in this GitHub repository.
Overall Architecture
1. Feature Pipelines
The first component of the ML system is to extract and perform feature engineering on the data before loading the transformed data into a feature store.
1.1. Data
I decided to use a real time API from Data Gov as the data source. The API allow us to query hourly recorded PSI data for various regions in Singapore.
An extraction API script will serve to pull the data using a GET http request.
1.2. Feature Engineering
Some fair amount of preprocessing will be required to prepared the data as features. The payload schema will need to be flatten and transformed to get the relevant records - timestamp
, update_timestamp
, readings_<regions>
. Regions comprises of north, south, east, west and central. For instance, a target variable reading_average
is created from averaging the hourly PSI for each regions.
1.3. Hopswork Feature Store
Hopsworks is a flexible and modular feature store that provides seamless integration for existing pipelines, superior performance for any SLA, and increased productivity for data and AI teams.
The feature pipelines section focuses on leveraging APIs for extracting data, performing some feature engineering before loading them into a feature store (Hopswork).
2. Training Pipelines
The second component will be a series of training pipelines which handles the heavylifting of model training. Data is first pulled from the feature store, with metadata loaded into wandb. The data will then undergo a series of model training with the output artifacts rendered and uploaded to wandb.
2.1. Model Training
A baseline model using naive bayes will serve as a benchmark. Next, a fancy model comprising of sktime and LightGBM will be tuned and trained using the best configs.
The best model will also be loaded into Hopswork’s model registry.
2.2. Weights & Biases (wandb)
Weights & Biases helps AI developers build better models faster. Quickly track experiments, version and iterate on datasets, evaluate model performance, reproduce models, and manage your ML workflows end-to-end.
For each of the runs, we can track the experimental output and performance, as well as the various model metrics.
3. Batch Prediction Pipelines
The third component, centering on batch prediction, entails a relatively straightforward procedure. Data is retrieved in batches from the Hopswork feature store, subjected to model inference to produce predictions, and subsequently linked to a cloud storage facility for caching the generated outputs.
3.1. Google Cloud Storage (GCS)
The Google Cloud Storage (GCS) serves as the repository for diverse data files stored in parquet formats, encompassing X and y features, predictions, and monitoring data. Although several tools, like Redis, are adept at caching predictions, incorporating such tools would have introduced complexity to the components, which falls outside the primary scope of this project.
To connect to a GCS bucket, I’ll create a GCP service account with the appropriate access credentials in order to connect to the bucket from the python scripts.
3.2. Batch Prediction
Each run involves extracting a batch of data within a specified datetime range, streamlining the batch inference process. The most recent and optimal model is loaded into memory by downloading the artifact from the model registry. Subsequently, the model predicts PSI values for the upcoming 24 hours, and these predictions are then stored in the Google Cloud Storage (GCS) bucket.
4. Scheduling and Orchestration using Airflow
4.1. Pypi Server
The PyPi registry is a server where you can host various Python modules. Only people with access to the PyPi server can install packages from it. A private PyPi server is configured to host the feature, training and batch prediction pipelines.
Poetry is used to package the feature, training and batch prediction pipelines as individual packages before uploading to the server.
4.2. Airflow
Airflow is used to schedule and orchestrate the pipelines using DAGs. Here’s an overview of how the flow and branching of DAGs are configured in Airflow.
5. Continuous Monitoring for Model Performance
5.1. Great Expectation (GE) Suite
GE Suite serves as a tool comprising verifiable assertions regarding data integrity. Hopsworks integrates GE support, enabling the addition of a GE validation suite to Hopsworks to define the expected behavior of new data.
An expectation is a verifiable assertion about data
Several expectations include:
- Ensuring that table columns align with a predefined ordered list.
- Verifying that the total number of columns is 7.
- Affirming that timestamp columns cannot be null.
- Specifying that readings columns are of type int32 and possess a minimum and maximum value of 0 and 500, respectively.
5.2. ML Monitoring
Ensuring the consistent and expected performance of the production system over time is crucial. Implementing a machine learning monitoring process establishes a mechanism to address any issues that may arise, facilitating the adaptation of the system and retraining the model in response to changes in the environment.
For instance, the Mean Absolute Percentage Error (MAPE) metric is continuously computed. A spike in this metric serves as an alarm, prompting actions such as fine-tuning the model or adjusting model configurations as necessary.
6. FastAPI and Streamlit
FastAPI and Streamlit will serve as the backend and frontend backbone for retrieving model ouputs (predictions and monitoring metrics) and rendering as an dashboard for visual purposes. Both applications are dockerised and deployed.
6.1. FastAPI
FastAPI is used as the backend to consume predicions and monitoring metrics from GCS and expose them through a RESTful API. A variety of endpoints are defined to GET the predictions and monitoring metrics.
Endpoints:
\health
: Health check\predictions
: GET prediction values\monitoring/metrics
: GET aggregated monitoring metrics
Upon receiving the data request, it will access the data storage encoded to the preconfigured Pydantic schema. The retrieved response is subsequently decoded to JSON.
6.2. Streamlit
Streamlit will be the frontend application that renders the data to visualise 2 dashboards:
- predictions
- monitoring metrics
7. System Deployment using GCP
Due to cost considerations, I have opted to exclude this section, as it falls outside the project’s defined scope.
In a production environment, the preferred approach involves deploying all machine learning components to a cloud provider (e.g., AWS, GCP, Azure) and establishing a Continuous Integration/Continuous Deployment (CI/CD) pipeline utilizing tools such as Github Actions or Azure Pipelines, among others.