Herausforderung: Ihre Erste Datenbereinigungsaufgabe
Swipe um das Menü anzuzeigen
Eine Datenpipeline folgt typischerweise einem Muster: Laden, Bereinigen, Transformieren und Speichern. In diesem Mini-Projekt wenden wir diese Schritte auf einen bestimmten Datensatz an, um von Rohdaten zu einer bereinigten, aggregierten Zusammenfassung für betriebliche Berichte zu gelangen.
Zum Abschluss von Abschnitt 4 setzen wir alles Gelernte in die Praxis um. Während diese Schritte auf jeden beliebigen Datensatz angewendet werden können, demonstrieren wir die Lösung anhand des bekannten Diamonds-Datensatzes. Dieser Datensatz enthält Preise und Attribute (Schliff, Farbe, Reinheit usw.) von fast 54.000 Diamanten.
Du kannst dem Video folgen und die Aufgabe selbstständig mit demselben Datensatz (weit verbreitet auf Kaggle als diamonds.csv) durchführen oder einen beliebigen anderen Datensatz wählen und das gleiche Prinzip anwenden, indem du ein ähnliches Ziel festlegst.
Bitte halten Sie idealerweise das Video an oder unterbrechen Sie das Lesen des Textes, nachdem Sie das Ziel gelesen haben. Versuchen Sie das Projekt eigenständig, unabhängig davon, wie sicher Sie sich mit Ihren Databricks-Kenntnissen fühlen. Selbst ein einfacher Versuch ist weitaus hilfreicher, als nur die Lösung durchzulesen. Sollten Sie keine Zeit haben, lesen Sie die Lösung weiter und probieren Sie die Aufgabe später zu Hause als Übung aus.
Das Ziel
Ziel ist es, den durchschnittlichen Preis und die durchschnittlichen Karat für Diamanten mit "Premium"-Schliff zu ermitteln, gruppiert nach ihrer Farbe, und nur die hochwertigen Ergebnisse (bei denen der durchschnittliche Preis über $4.000 liegt) in einer neuen Tabelle zu speichern.
Schritt 1: Laden und Überprüfen
Zuerst werden die Daten geladen und das Schema überprüft, um sicherzustellen, dass Preise und Karat als numerische Werte erkannt werden.
# Assuming the diamonds CSV was uploaded to a Volume
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/main/default/my_volume/diamonds.csv")
df.printSchema()
Schritt 2: Filtern und Auswählen
Nicht alle Spalten werden benötigt, und für diese Aufgabe sind nur "Premium"-Diamanten relevant:
# Filter for Premium cut and select relevant columns
premium_df = df.filter(df.cut == "Premium").select("color", "price", "carat")
Schritt 3: Aggregieren und Gruppieren
Nun können die Durchschnitte berechnet werden. Verwenden Sie die Bibliothek pyspark.sql.functions, um eine korrekte Berechnung sicherzustellen.
from pyspark.sql import functions as F
# Group by color and calculate averages
summary_df = premium_df.groupBy("color").agg(
F.avg("price").alias("avg_price"),
F.avg("carat").alias("avg_carat")
)
.alias() benennt einfach eine Spalte im Abfrageergebnis um – wie ein temporärer Spitzname. Die tatsächliche Tabelle wird nicht verändert, nur die Darstellung der Spalte im Output.
Vergleichbar mit AS in SQL — SELECT price AS price_in_usd.
Nützlich, wenn Spaltennamen lang, unklar oder bei berechneten Spalten ein lesbarer Name benötigt wird.
Schritt 4: Letzter Filter und Sortierung
Es sollen nur die hochwertigen Kategorien beibehalten werden, bei denen der Durchschnittspreis 4.000 übersteigt. Die Ergebnisse werden zudem so sortiert, dass der höchste Durchschnittspreis oben steht.
final_df = summary_df.filter(F.col("avg_price") > 4000).orderBy("avg_price", ascending=False)
display(final_df)
Möglicherweise bist du der Funktion F.col() noch nicht begegnet.
F.col() dient dazu, in PySpark eine Spalte anhand ihres Namens zu referenzieren. Das F ist lediglich das Alias für pyspark.sql.functions — am Anfang des Notebooks wie folgt importiert:
from pyspark.sql import functions as F
df.select(F.col("price"))
In vielen Fällen entspricht dies dem direkten Verwenden des Spaltennamens als String, jedoch wird F.col() bevorzugt, da damit Operationen direkt verkettet werden können:
F.col("price") * 1.1
F.col("cut").alias("diamond_cut")
Man kann sich F.col("price") so vorstellen, als würde man sagen: "Gib mir die Spalte price als Objekt, mit dem ich tatsächlich arbeiten kann."
Schritt 5: Ergebnis speichern
Abschließend diesen bereinigten Bericht "High-Value Premium Diamonds" im Katalog speichern, damit das Analytics-Team darauf zugreifen kann.
final_df.write.mode("overwrite").saveAsTable("main.default.high_value_premium_diamonds")
1. Warum wurde in Schritt 3 die Methode .alias() innerhalb der Aggregation verwendet?
2. Wenn Sie dieses Projekt auf Ihr eigenes Dataset anwenden möchten, welchen Teil des Codes müssen Sie zuerst ändern?
Danke für Ihr Feedback!
Fragen Sie AI
Fragen Sie AI
Fragen Sie alles oder probieren Sie eine der vorgeschlagenen Fragen, um unser Gespräch zu beginnen