Uitdaging: Uw Eerste Gegevensopschoningstaak
Veeg om het menu te tonen
Een datapijplijn volgt doorgaans een patroon: Laden, Opschonen, Transformeren en Opslaan. In dit mini-project passen we deze stappen toe op een specifiek gegevensbestand om van ruwe data naar een opgeschoonde, geaggregeerde samenvatting te gaan die klaar is voor zakelijke rapportages.
Ter afsluiting van Sectie 4 brengen we alles wat je hebt geleerd in de praktijk. Hoewel je deze stappen op elk gewenst gegevensbestand kunt toepassen, demonstreren we de oplossing met de populaire Diamonds dataset. Deze dataset bevat de prijzen en kenmerken (slijpvorm, kleur, zuiverheid, enz.) van bijna 54.000 diamanten.
Je kunt de video volgen en de opdracht zelf uitvoeren met dezelfde dataset (breed beschikbaar via Kaggle als diamonds.csv), of je kunt een eigen dataset kiezen en hetzelfde principe toepassen door een vergelijkbaar doel te stellen.
Neem idealiter even de tijd om de video te pauzeren of de tekst te onderbreken na het lezen van het doel. Probeer het project zelfstandig uit, ongeacht hoe vertrouwd je bent met je kennis van Databricks. Zelfs een eenvoudige poging is veel waardevoller dan alleen de oplossing doorlezen. Mocht je geen tijd hebben, lees dan gerust de oplossing verder en probeer deze oefening later thuis alsnog uit te voeren.
Het Doel
Het doel is het identificeren van de gemiddelde prijs en het gemiddelde aantal karaat voor diamanten met een "Premium" slijpvorm, gegroepeerd op kleur, en alleen de resultaten met hoge waarde (waar de gemiddelde prijs boven de $4.000 ligt) opslaan in een nieuwe tabel.
Stap 1: Laden en Inspecteren
Laad eerst de gegevens en controleer het schema om te verzekeren dat de prijzen en karaat als numerieke waarden worden herkend.
# Assuming the diamonds CSV was uploaded to a Volume
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/main/default/my_volume/diamonds.csv")
df.printSchema()
Stap 2: Filteren en Selecteren
Niet alle kolommen zijn nodig, en voor deze specifieke taak zijn alleen "Premium" diamanten van belang:
# Filter for Premium cut and select relevant columns
premium_df = df.filter(df.cut == "Premium").select("color", "price", "carat")
Stap 3: Aggregatie en Groeperen
Nu kun je de gemiddelden berekenen. Gebruik de bibliotheek pyspark.sql.functions om ervoor te zorgen dat onze berekeningen nauwkeurig zijn.
from pyspark.sql import functions as F
# Group by color and calculate averages
summary_df = premium_df.groupBy("color").agg(
F.avg("price").alias("avg_price"),
F.avg("carat").alias("avg_carat")
)
.alias() geeft eenvoudigweg een andere naam aan een kolom in je queryresultaat — vergelijkbaar met een tijdelijke bijnaam. Het verandert de daadwerkelijke tabel niet, alleen hoe de kolom in de uitvoer wordt weergegeven.
Zie het als AS in SQL — SELECT price AS price_in_usd.
Handig wanneer kolomnamen lang of onduidelijk zijn, of wanneer je een berekende kolom maakt en deze een leesbare naam wilt geven.
Stap 4: Definitieve filter en sortering
We willen alleen de waardevolle categorieën behouden waarbij de gemiddelde prijs hoger is dan 4.000. We sorteren de resultaten zodat de hoogste gemiddelde prijs bovenaan staat.
final_df = summary_df.filter(F.col("avg_price") > 4000).orderBy("avg_price", ascending=False)
display(final_df)
Je bent mogelijk de functie F.col() nog niet tegengekomen.
F.col() is de manier om een kolom op naam te refereren in PySpark. De F is simpelweg het alias voor pyspark.sql.functions — bovenaan je notebook geïmporteerd als volgt:
from pyspark.sql import functions as F
df.select(F.col("price"))
Dit is in veel gevallen gelijk aan het typen van de kolomnaam als string, maar F.col() heeft de voorkeur omdat je hiermee direct bewerkingen kunt uitvoeren:
F.col("price") * 1.1
F.col("cut").alias("diamond_cut")
Zie F.col("price") als "geef me de kolom price als een object waarmee ik daadwerkelijk dingen kan doen."
Stap 5: Sla het resultaat op
Sla ten slotte dit opgeschoonde "High-Value Premium Diamonds"-rapport op in onze Catalogus zodat het analytics-team het kan gebruiken.
final_df.write.mode("overwrite").saveAsTable("main.default.high_value_premium_diamonds")
1. Waarom heb je in stap 3 de .alias()-methode gebruikt binnen de aggregatie?
2. Als je dit project op je eigen dataset wilt toepassen, welk deel van de code moet je dan als eerste aanpassen?
Bedankt voor je feedback!
Vraag AI
Vraag AI
Vraag wat u wilt of probeer een van de voorgestelde vragen om onze chat te starten.