Défi : Votre Première Tâche de Nettoyage de Données
Glissez pour afficher le menu
Un pipeline de données suit généralement un schéma : Charger, Nettoyer, Transformer et Sauvegarder. Dans ce mini-projet, ces étapes sont appliquées à un jeu de données spécifique afin de passer de données brutes à un résumé agrégé propre, prêt pour le reporting métier.
Pour conclure la section 4, mise en pratique de toutes les notions abordées. Bien que ces étapes puissent être appliquées à n'importe quel jeu de données, la solution sera illustrée à l'aide du célèbre jeu de données Diamonds. Ce jeu de données contient les prix et attributs (taille, couleur, pureté, etc.) de près de 54 000 diamants.
Il est possible de suivre la vidéo et de réaliser la tâche en utilisant le même jeu de données (largement disponible sur Kaggle sous le nom diamonds.csv), ou de choisir un autre jeu de données et d'appliquer le même principe en définissant un objectif similaire.
Veuillez, idéalement, mettre la vidéo en pause ou arrêter la lecture du texte après avoir pris connaissance de l'objectif. Essayez de réaliser le projet par vous-même, quel que soit votre niveau de familiarité avec Databricks. Même une tentative simple sera bien plus bénéfique que de simplement lire la solution. Si vous manquez de temps, poursuivez la lecture de la solution et essayez cet exercice chez vous ultérieurement.
Objectif
Identifier le prix moyen et le nombre moyen de carats pour les diamants de taille « Premium », regroupés par couleur, et enregistrer uniquement les résultats de grande valeur (où le prix moyen est supérieur à 4 000 $) dans une nouvelle table.
Étape 1 : Charger et inspecter
Commencer par charger les données et vérifier le schéma afin de s'assurer que les champs prix et carats sont bien reconnus comme valeurs numériques.
# Assuming the diamonds CSV was uploaded to a Volume
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/main/default/my_volume/diamonds.csv")
df.printSchema()
Étape 2 : Filtrer et sélectionner
Toutes les colonnes ne sont pas nécessaires, et seules les pierres "Premium" vous intéressent pour cette tâche spécifique :
# Filter for Premium cut and select relevant columns
premium_df = df.filter(df.cut == "Premium").select("color", "price", "carat")
Étape 3 : Agréger et regrouper
Vous pouvez maintenant calculer les moyennes. Utilisez la bibliothèque pyspark.sql.functions pour garantir l'exactitude des calculs.
from pyspark.sql import functions as F
# Group by color and calculate averages
summary_df = premium_df.groupBy("color").agg(
F.avg("price").alias("avg_price"),
F.avg("carat").alias("avg_carat")
)
.alias() permet simplement de renommer une colonne dans le résultat de la requête — comme un surnom temporaire. Cela ne modifie pas la table réelle, seulement l'affichage du nom de la colonne dans la sortie.
À rapprocher de AS en SQL — SELECT price AS price_in_usd.
Utile lorsque les noms de colonnes sont longs, peu clairs, ou lors de la création d'une colonne calculée nécessitant un nom lisible.
Étape 4 : Filtrage final et tri
Nous souhaitons uniquement conserver les catégories à forte valeur où le prix moyen dépasse 4 000. Les résultats seront également triés afin que la moyenne la plus élevée apparaisse en haut de la liste.
final_df = summary_df.filter(F.col("avg_price") > 4000).orderBy("avg_price", ascending=False)
display(final_df)
Vous n'avez peut-être pas encore rencontré la fonction F.col().
F.col() permet de référencer une colonne par son nom dans PySpark. F est simplement l'alias de pyspark.sql.functions — importé en début de notebook comme ceci :
from pyspark.sql import functions as F
df.select(F.col("price"))
C'est équivalent à utiliser le nom de la colonne sous forme de chaîne de caractères dans de nombreux cas, mais F.col() est préférable car il permet d'enchaîner directement des opérations :
F.col("price") * 1.1
F.col("cut").alias("diamond_cut")
Considérez F.col("price") comme une façon de dire « donne-moi la colonne price comme un objet sur lequel je peux réellement effectuer des opérations. »
Étape 5 : Enregistrer le résultat
Enfin, valider ce rapport nettoyé « High-Value Premium Diamonds » dans notre catalogue afin que l'équipe analytique puisse l'utiliser.
final_df.write.mode("overwrite").saveAsTable("main.default.high_value_premium_diamonds")
1. À l'étape 3, pourquoi avez-vous utilisé la méthode .alias() dans l'agrégation ?
2. Si vous souhaitez appliquer ce projet à votre propre jeu de données, quelle partie du code devez-vous modifier en premier ?
Merci pour vos commentaires !
Demandez à l'IA
Demandez à l'IA
Posez n'importe quelle question ou essayez l'une des questions suggérées pour commencer notre discussion