Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Lernen Automating Model Retraining | Orchestrating ML Pipelines
MLOps for Machine Learning Engineers

bookAutomating Model Retraining

As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.

In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.

Example: Automated Retraining DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging

def load_data():
    iris = load_iris(as_frame=True)
    df = iris.frame
    df.to_csv('/tmp/iris.csv', index=False)

def train_and_evaluate():
    df = pd.read_csv('/tmp/iris.csv')
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', RandomForestClassifier(n_estimators=50, random_state=42))
    ])
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    logging.info(f"Retrained model accuracy: {acc:.4f}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'retrain_ml_model',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['ml', 'retraining'],
)

load_data_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

train_and_evaluate_task = PythonOperator(
    task_id='train_and_evaluate',
    python_callable=train_and_evaluate,
    dag=dag,
)

load_data_task >> train_and_evaluate_task
Note
Note

Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.

question mark

What is the main benefit of automating model retraining with Airflow?

Select the correct answer

War alles klar?

Wie können wir es verbessern?

Danke für Ihr Feedback!

Abschnitt 4. Kapitel 3

Fragen Sie AI

expand

Fragen Sie AI

ChatGPT

Fragen Sie alles oder probieren Sie eine der vorgeschlagenen Fragen, um unser Gespräch zu beginnen

Suggested prompts:

Can you explain how to add more steps to the DAG, like model deployment?

How do I monitor the retraining process in Airflow?

What changes are needed to use my own dataset instead of Iris?

Awesome!

Completion rate improved to 6.25

bookAutomating Model Retraining

Swipe um das Menü anzuzeigen

As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.

In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.

Example: Automated Retraining DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging

def load_data():
    iris = load_iris(as_frame=True)
    df = iris.frame
    df.to_csv('/tmp/iris.csv', index=False)

def train_and_evaluate():
    df = pd.read_csv('/tmp/iris.csv')
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', RandomForestClassifier(n_estimators=50, random_state=42))
    ])
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    logging.info(f"Retrained model accuracy: {acc:.4f}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'retrain_ml_model',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['ml', 'retraining'],
)

load_data_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

train_and_evaluate_task = PythonOperator(
    task_id='train_and_evaluate',
    python_callable=train_and_evaluate,
    dag=dag,
)

load_data_task >> train_and_evaluate_task
Note
Note

Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.

question mark

What is the main benefit of automating model retraining with Airflow?

Select the correct answer

War alles klar?

Wie können wir es verbessern?

Danke für Ihr Feedback!

Abschnitt 4. Kapitel 3
some-alt