Sfida: Il Tuo Primo Compito di Pulizia dei Dati
Scorri per mostrare il menu
Una pipeline di dati segue tipicamente uno schema: Caricamento, Pulizia, Trasformazione e Salvataggio. In questo mini-progetto, applichiamo questi passaggi a un dataset specifico per passare da dati grezzi a un riepilogo aggregato e pulito, pronto per la reportistica aziendale.
Per concludere la Sezione 4, mettiamo in pratica tutto ciò che hai imparato. Sebbene tu possa applicare questi passaggi a qualsiasi dataset a tua scelta, mostreremo la soluzione utilizzando il popolare dataset Diamonds. Questo dataset contiene i prezzi e le caratteristiche (taglio, colore, purezza, ecc.) di quasi 54.000 diamanti.
Puoi seguire il video ed eseguire il compito autonomamente utilizzando lo stesso dataset (ampiamente disponibile su Kaggle come diamonds.csv) oppure puoi scegliere qualsiasi altro dataset e applicare lo stesso principio fissando un obiettivo simile.
Si consiglia di mettere in pausa il video o interrompere la lettura del testo dopo aver letto l'obiettivo. Prova a svolgere il progetto autonomamente, indipendentemente dal tuo livello di familiarità con Databricks. Anche un semplice tentativo sarà molto più utile che limitarsi a leggere la soluzione. Se non hai tempo, continua pure a leggere la soluzione e prova comunque a svolgere l'esercizio in un secondo momento.
L'obiettivo
Identificare il prezzo medio e i carati medi dei diamanti con taglio "Premium", raggruppati per colore, e salvare solo i risultati di alto valore (dove il prezzo medio supera i $4.000) in una nuova tabella.
Passo 1: Caricamento e ispezione
Per prima cosa, caricare i dati e verificare lo schema per assicurarsi che i prezzi e i carati siano riconosciuti come valori numerici.
# Assuming the diamonds CSV was uploaded to a Volume
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/main/default/my_volume/diamonds.csv")
df.printSchema()
Passaggio 2: Filtrare e selezionare
Non sono necessarie tutte le colonne e, per questo compito specifico, interessano solo i diamanti "Premium":
# Filter for Premium cut and select relevant columns
premium_df = df.filter(df.cut == "Premium").select("color", "price", "carat")
Passaggio 3: Aggregazione e Raggruppamento
Ora puoi calcolare le medie. Utilizza la libreria pyspark.sql.functions per garantire l'accuratezza dei calcoli.
from pyspark.sql import functions as F
# Group by color and calculate averages
summary_df = premium_df.groupBy("color").agg(
F.avg("price").alias("avg_price"),
F.avg("carat").alias("avg_carat")
)
.alias() rinomina semplicemente una colonna nel risultato della query — come un soprannome temporaneo. Non modifica la tabella reale, ma solo come appare la colonna nell'output.
Pensalo come AS in SQL — SELECT price AS price_in_usd.
Utile quando i nomi delle colonne sono lunghi, poco chiari o quando si crea una colonna calcolata e si desidera assegnarle un nome leggibile.
Passaggio 4: Filtro finale e ordinamento
Vogliamo mantenere solo le categorie di alto valore in cui il prezzo medio supera 4.000. Ordineremo inoltre i risultati in modo che il prezzo medio più alto sia in cima.
final_df = summary_df.filter(F.col("avg_price") > 4000).orderBy("avg_price", ascending=False)
display(final_df)
Potresti non aver ancora incontrato la funzione F.col().
F.col() è il modo in cui si fa riferimento a una colonna per nome in PySpark. F è semplicemente l'alias per pyspark.sql.functions — importato all'inizio del notebook in questo modo:
from pyspark.sql import functions as F
df.select(F.col("price"))
In molti casi è equivalente a digitare semplicemente il nome della colonna come stringa, ma F.col() è preferito perché consente di concatenare direttamente le operazioni su di essa:
F.col("price") * 1.1
F.col("cut").alias("diamond_cut")
Pensa a F.col("price") come a "dammi la colonna price come oggetto su cui posso effettivamente eseguire delle operazioni."
Passaggio 5: Salva il risultato
Infine, registra questo report pulito "High-Value Premium Diamonds" nel nostro Catalogo affinché il team di analisi possa utilizzarlo.
final_df.write.mode("overwrite").saveAsTable("main.default.high_value_premium_diamonds")
1. Nel Passaggio 3, perché hai utilizzato il metodo .alias() all'interno dell'aggregazione?
2. Se vuoi applicare questo progetto al tuo dataset, quale parte del codice devi modificare per prima?
Grazie per i tuoi commenti!
Chieda ad AI
Chieda ad AI
Chieda pure quello che desidera o provi una delle domande suggerite per iniziare la nostra conversazione