Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Learn Automating ML Workflows | Section
MLOps Fundamentals with Python

bookAutomating ML Workflows

Swipe to show menu

Automating your machine learning (ML) workflows can significantly boost both efficiency and reliability throughout the ML lifecycle. Instead of performing repetitive tasks manually, workflow automation ensures consistency, reduces human error, and accelerates the pace at which you can iterate on experiments and deploy models. Automation is especially valuable in collaborative environments, where standardized procedures help teams maintain high quality and reproducibility across projects.

Common tools for workflow automation include:

  • Orchestration frameworks like Apache Airflow or Prefect, which define and manage complex multi-step pipelines as code;
  • Workflow schedulers such as cron (Linux/macOS) or Task Scheduler (Windows), which trigger pipeline runs at defined time intervals;
  • Scripting languages like python, which glue these components together.

These tools allow you to define, execute, and monitor a sequence of tasks — such as data preprocessing, model training, evaluation, and deployment — with minimal manual intervention.

The example below shows a production-style automated pipeline using the Iris dataset. It uses LogisticRegression, a classification algorithm that predicts the probability of each class label — well suited here because the Iris dataset has clearly separable classes and a small number of features. The pipeline includes error handling, logging, and data validation to reflect real-world reliability requirements.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
import logging import pandas as pd from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score import joblib # Configuring the logger to record pipeline events with timestamps logging.basicConfig( level=logging.INFO, format="%(asctime)s — %(levelname)s — %(message)s" ) def load_data(): iris = load_iris(as_frame=True) X = iris.data y = iris.target logging.info(f"Data loaded: {X.shape[0]} rows, {X.shape[1]} features.") return X, y def validate_data(X, y): # Checking for missing values and label alignment before training if X.isnull().any().any(): raise ValueError("Input data contains missing values.") if len(X) != len(y): raise ValueError("Feature matrix and labels have different lengths.") logging.info("Data validation passed.") def preprocess_data(X_train, X_test): scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) # Fitting scaler on training data only X_test_scaled = scaler.transform(X_test) # Applying the same scale to test data return X_train_scaled, X_test_scaled, scaler def train_model(X_train, y_train): model = LogisticRegression(max_iter=200) model.fit(X_train, y_train) logging.info("Model training complete.") return model def evaluate_model(model, X_train, y_train, X_test, y_test): train_acc = accuracy_score(y_train, model.predict(X_train)) test_acc = accuracy_score(y_test, model.predict(X_test)) logging.info(f"Train accuracy: {train_acc:.2f} | Test accuracy: {test_acc:.2f}") return {"train_accuracy": train_acc, "test_accuracy": test_acc} def run_pipeline(): try: X, y = load_data() validate_data(X, y) # Splitting before preprocessing prevents data leakage from the test set. # random_state=42 fixes the shuffle so every run produces the same split, # which is essential for reproducibility when comparing experiments. X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) X_train_scaled, X_test_scaled, scaler = preprocess_data(X_train, X_test) model = train_model(X_train_scaled, y_train) metrics = evaluate_model(model, X_train_scaled, y_train, X_test_scaled, y_test) # Saving artifacts so the trained model and scaler can be loaded for deployment joblib.dump(model, "model.pkl") joblib.dump(scaler, "scaler.pkl") logging.info("Pipeline completed successfully.") return metrics except Exception as e: logging.error(f"Pipeline failed: {e}") raise run_pipeline()
copy

Once you have a reliable pipeline function like run_pipeline(), scheduling it is straightforward. On Linux or macOS, a cron job can trigger it at a fixed interval — for example, every night at 2 AM:

On cloud platforms, the same script can be triggered by Cloud Scheduler (GCP), EventBridge (AWS), or equivalent services. For event-based triggering — such as retraining when new data arrives — orchestration frameworks like Apache Airflow let you define a DAG (Directed Acyclic Graph) where each pipeline stage is a separate task with its own retry logic and monitoring.

By combining a well-structured run_pipeline() function with a scheduler or orchestration framework, you ensure that your ML systems remain up-to-date and responsive to changes without requiring constant manual oversight.

question mark

How does automating ML workflows improve the reliability of a machine learning project?

Select all correct answers

Everything was clear?

How can we improve it?

Thanks for your feedback!

Section 1. Chapter 9

Ask AI

expand

Ask AI

ChatGPT

Ask anything or try one of the suggested questions to begin our chat

Section 1. Chapter 9
some-alt