Logging and Monitoring Basics
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061import logging import pandas as pd # Configure logging logging.basicConfig( filename="pipeline.log", level=logging.INFO, format="%(asctime)s %(levelname)s:%(message)s" ) # URL to Titanic dataset DATA_URL = ( "https://content-media-cdn.codefinity.com/" "courses/68740680-eb0c-4ee2-91a8-54871b7c1823/titanic.csv" ) def extract_data(url): try: logging.info("Starting data extraction from URL: %s", url) df = pd.read_csv(url) logging.info("Extraction successful: %d records", len(df)) return df except Exception as e: logging.error("Error during extraction: %s", e) raise def transform_data(df): try: logging.info("Starting data transformation") # Simple cleaning: drop rows with missing Age df_clean = df.dropna(subset=["Age"]) logging.info("Transformation successful: %d records after cleaning", len(df_clean)) return df_clean except Exception as e: logging.error("Transformation error: %s", e) raise def load_data(df): """ Simulated load step. In Codefinity, writing to disk is not possible, so instead we log success and return the df. """ try: logging.info("Simulating load step (no file output in this environment)") logging.info("Load successful: %d final records", len(df)) return df except Exception as e: logging.error("Load error: %s", e) raise # Example pipeline execution if __name__ == "__main__": raw = extract_data(DATA_URL) cleaned = transform_data(raw) final_df = load_data(cleaned) print("Pipeline executed successfully!") print("Final row count:", len(final_df))
Adding logging to your data pipelines is essential for production readiness. Logging allows you to record events, track errors, and understand the flow of your pipeline. With structured logs, you can trace problems, audit data movement, and validate that each pipeline stage executes as expected.
Monitoring builds on logging by providing visibility into pipeline health and performance. Important monitoring metrics for data pipelines include:
- Number of processed records;
- Error rates and types;
- Execution duration for each pipeline stage;
- Resource usage such as memory and CPU.
Alerting basics involve setting up notifications when metrics exceed safe thresholds. For example, you might trigger an alert if error rates spike, processing times exceed expected limits, or if no data is processed for a scheduled run. Alerts can be sent via email, messaging platforms, or integrated monitoring dashboards.
Log analysis is a critical part of monitoring. By reviewing logs, you can identify recurring errors, bottlenecks, or unusual patterns. Automated log analysis tools can help surface problems quickly. In production, you should regularly review pipeline logs, investigate anomalies, and refine your monitoring and alerting strategies to ensure reliable, robust data workflows.
¡Gracias por tus comentarios!
Pregunte a AI
Pregunte a AI
Pregunte lo que quiera o pruebe una de las preguntas sugeridas para comenzar nuestra charla
Awesome!
Completion rate improved to 6.67
Logging and Monitoring Basics
Desliza para mostrar el menú
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061import logging import pandas as pd # Configure logging logging.basicConfig( filename="pipeline.log", level=logging.INFO, format="%(asctime)s %(levelname)s:%(message)s" ) # URL to Titanic dataset DATA_URL = ( "https://content-media-cdn.codefinity.com/" "courses/68740680-eb0c-4ee2-91a8-54871b7c1823/titanic.csv" ) def extract_data(url): try: logging.info("Starting data extraction from URL: %s", url) df = pd.read_csv(url) logging.info("Extraction successful: %d records", len(df)) return df except Exception as e: logging.error("Error during extraction: %s", e) raise def transform_data(df): try: logging.info("Starting data transformation") # Simple cleaning: drop rows with missing Age df_clean = df.dropna(subset=["Age"]) logging.info("Transformation successful: %d records after cleaning", len(df_clean)) return df_clean except Exception as e: logging.error("Transformation error: %s", e) raise def load_data(df): """ Simulated load step. In Codefinity, writing to disk is not possible, so instead we log success and return the df. """ try: logging.info("Simulating load step (no file output in this environment)") logging.info("Load successful: %d final records", len(df)) return df except Exception as e: logging.error("Load error: %s", e) raise # Example pipeline execution if __name__ == "__main__": raw = extract_data(DATA_URL) cleaned = transform_data(raw) final_df = load_data(cleaned) print("Pipeline executed successfully!") print("Final row count:", len(final_df))
Adding logging to your data pipelines is essential for production readiness. Logging allows you to record events, track errors, and understand the flow of your pipeline. With structured logs, you can trace problems, audit data movement, and validate that each pipeline stage executes as expected.
Monitoring builds on logging by providing visibility into pipeline health and performance. Important monitoring metrics for data pipelines include:
- Number of processed records;
- Error rates and types;
- Execution duration for each pipeline stage;
- Resource usage such as memory and CPU.
Alerting basics involve setting up notifications when metrics exceed safe thresholds. For example, you might trigger an alert if error rates spike, processing times exceed expected limits, or if no data is processed for a scheduled run. Alerts can be sent via email, messaging platforms, or integrated monitoring dashboards.
Log analysis is a critical part of monitoring. By reviewing logs, you can identify recurring errors, bottlenecks, or unusual patterns. Automated log analysis tools can help surface problems quickly. In production, you should regularly review pipeline logs, investigate anomalies, and refine your monitoring and alerting strategies to ensure reliable, robust data workflows.
¡Gracias por tus comentarios!