Czym są pliki w formacie Apache Parquet
Apache Parquet to format plików danych typu open source, zorientowany na kolumny, zaprojektowany z myślą o efektywnym przechowywaniu i pobieraniu danych. Oferuje wydajne schematy kompresji i kodowania, umożliwiające obsługę złożonych danych w dużych ilościach i jest obsługiwany przez wiele języków programowania oraz narzędzi analitycznych. Pełny opis formatu parquet oraz dokumentacja znajduje sie na stronie https://parquet.apache.org/
Czym się różni format CSV of formatu Parquet?
Główna różnica między plikami Parquet a CSV polega na sposobie przechowywania danych i efektywności: CSV to tekstowy format wierszowy, łatwy do odczytu przez człowieka, ale wolny przy dużych plikach, bez typów danych i zwykle większy.
Parquet to binarny format kolumnowy, wspierający kompresję i typy danych, umożliwiający szybki odczyt tylko potrzebnych kolumn i bardziej wydajny przy analizie dużych zbiorów danych.
CSV
Plik zwykle zaczyna się od nagłówka (pierwszego wiersza z nazwami kolumn), a kolejne wiersze zawierają dane, gdzie kolumny są oddzielone określonym separatorem, najczęściej przecinkiem, czasem średnikiem lub tabulatorem. Nagłówek nie jest obowiązkowy - dodawany jest tylko ze względu na łatwiejszy odczyt pliku. Oto przykład pliku tekstowego, CSV:

Parquet
Plik Parquet ma strukturę kolumnową i binarną: zamiast wierszy z separatorami, dane każdej kolumny są przechowywane razem w tzw. column chunks, a wiele takich bloków tworzy row groups. Plik zawiera także metadane opisujące schemat danych i typy kolumn, co umożliwia szybki odczyt wybranych kolumn. Nie ma tradycyjnego nagłówka ani separatorów – kolejność elementów w kolumnach odpowiada wierszom, dzięki czemu system wie, które wartości z różnych kolumn tworzą razem wiersz. Z naszego CSV powyżej powstały trzy kolumny.

Ale to nie wszystko. Dane są dzielone na tak zwane 'row groups' - bloki wierszy (np. po 50 tysięcy wierszy). Każdy taki zestaw wierszy zawiera 'chunki' (bloki kolumn) w których przechowywane są dane kolumn. Dane w plikach Parquet dzieli się na row groups, żeby szybciej wczytywać tylko potrzebne fragmenty, lepiej kompresować dane, przetwarzać je równolegle i efektywnie zarządzać pamięcią. Gdybyśmy mieli dwie row groups dla informacji z naszego CSV, jedna przechowywałaby pierwsze dwa wiersze a druga dwa następne:

Jedna kolumna jest przechowywana zawsze tylko w jednym 'chunk' na poziomie row group. Każdy row group ma chunki dla wszystkich kolumn. Prawdzia struktura pliku parquet jest jeszcze bardziej złożona; np. każdy z bloków kolumn ma jeszcze strony (pages) a w pliku znajdują się metadata, statystyki.. Oto schemat struktury pliku:
Plik Parquet
├─ File MetaData (schemat tabeli, liczba row groups, statystyki globalne)
│
├─ Row Group 0
│ ├─ Column Chunk (kolumna A)
│ │ ├─ Page Header (statystyki strony, typ kompresji, kodowanie)
│ │ ├─ Page 1 (dane zakodowane i skompresowane)
│ │ ├─ Page 2 (dane zakodowane i skompresowane)
│ │ └─ Page 3 (...)
│ │
│ ├─ Column Chunk (kolumna B)
│ │ ├─ Page Header
│ │ ├─ Page 1 (...)
│ │ └─ Page 2 (...)
│ │
│ └─ Column Chunk (kolumna C)
│ ├─ Page Header
│ ├─ Page 1 (...)
│ └─ Page 2 (...)
│
├─ Row Group 1
│ └─ Column Chunk (kolumny A, B, C analogicznie jak wyżej)
│
└─ File Footer
├─ Informacje o row groups (offsety w pliku, liczba wierszy)
├─ Statystyki kolumn (np. min/max wartości)
└─ Magic bytes ("PAR1" na początku i końcu pliku)
Pisząc aplikacje w Spark nie schodzisz jednak do tego poziomu więc objaśnianie tych elementów nie jest ważne dla pracy z plikami parquet.
Zapisywanie do formatu parquet
Jednym z najlepszych narzędzi do tworzenia plików w formacie parquet jest sam Spark. Oto prosty kod jak odczytać nasz testowy plik z wcześniejszego artykułu 'wprowadzenie do Apache Spark: uruchamianie zadań'.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CSV to Parquet")
.master("local[*]")
.getOrCreate()
val startTime = System.currentTimeMillis()
val df = spark.read
.option("header", "true") // jeśli CSV ma nagłówki
.option("inferSchema", "true") // Spark sam rozpozna typy
.option("sep", ";") // określamy separator
.csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")
// Zapis do Parquet
df.write
.mode("overwrite")
.parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")
val endTime = System.currentTimeMillis()
println(f"Czas wykonania: ${(endTime - startTime)/1000.0}%.2f s")
Rezultat wykonania tego kodu znajdziesz w katalogu 'sales_data_parquet'. Nie znajdziesz tam tylko jednego pliku ale nie mniej niż cztery:
Co jest w katalogu salesdata_parquet
-
.parquet→ właściwe dane (1 partycja = 1 plik) -
.crc→ pliki kontrolne (checksum) → potrzebne Sparkom/HDFS do sprawdzenia integralności danych -
_SUCCESS→ plik „znacznikowy”, informuje, że zapis się zakończył poprawnie
Nie używasz ich bezpośrednio – większość narzędzi (Spark, Hive, Presto, Pandas, Tableau) ignoruje _SUCCESS i .crc, czyta tylko .parquet. Zatem nie musisz czyścić tego katalogu usuwając inne pliki niż .parquet
Pliki te, inne niz .parquet zajmują niewielem miejsca:
-
_SUCCESS-
Ma 0 bajtów (lub czasem minimalny rozmiar) → praktycznie nie zajmuje miejsca.
-
Informuje systemy rozproszone (Spark/Hadoop/HDFS), że zapis zakończył się poprawnie.
-
Jest przydatny, jeśli np. kolejny proces (Airflow, Spark, Hive) ma sprawdzić, czy katalog jest kompletny.
-
-
.crc-
Checksum plików → używane w HDFS i niektórych systemach lokalnych do sprawdzania integralności danych.
-
Rozmiar zwykle bardzo mały (kilka KB na plik) w porównaniu do danych.
-
Są automatycznie generowane i ignorowane przez narzędzia do analizy danych (Pandas, Spark, Presto, Hive).
-
Masz jednak wpływ na to ile plików .parquet powstawnie w wyniku wykonania zadania. Możesz w tym celu użyć jednej z dwóch opcji; repartion lub coalesce:
repartition(N)
-
Działa na całym DataFrame.
-
Robi pełny shuffle danych: Spark analizuje wszystkie wiersze i przepakowuje je równomiernie do N partycji.
-
Skutek: każda partycja ma mniej więcej tyle samo danych → pliki Parquet będą podobnej wielkości.
-
Koszt: shuffle = dużo kopiowania danych między workerami → wolniejsze, ale dokładne.
coalesce(N)
-
Nie robi shuffle.
-
Działa „w locie”, scala istniejące partycje w mniejszą liczbę.
-
Skutek: liczba partycji zmniejszona, ale dane mogą być nierównomiernie rozłożone → nie wszystkie pliki będą tej samej wielkości.
-
Koszt: znacznie mniejszy niż
repartition, bo nie trzeba przesyłać danych między workerami.
Oto przykład użycia opcji repartition:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CSV to Parquet")
.master("local[*]")
.getOrCreate()
val startTime = System.currentTimeMillis()
val df = spark.read
.option("header", "true") // jeśli CSV ma nagłówki
.option("inferSchema", "true") // Spark sam rozpozna typy
.option("sep", "|") // określamy separator
.csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")
// Ustawienie liczby partycji i przypisanie do nowego DataFrame
val df2 = df.repartition(2) // teraz df2 ma 2 partycje
// Zapis do Parquet
df2.write
.mode("overwrite")
.parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")
val endTime = System.currentTimeMillis()
println(f"Czas wykonania: ${(endTime - startTime)/1000.0}%.2f s")
Chcemy jednak docelowo wykorzystać potencjał formatu parquet - nie tylko przechowywanie danych kolumnowo ale też przechowywanie w plików informacji o typach danych. Zmodyfikujmy zatem nasz kod by przy zapisie konwertował numer klienta do integer a sumę sprzedaży do double. Spark daje nam możliwość pozostawiania domyślnego typu danych (string) dla pozostałych kolumn:
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CSV to Parquet")
.master("local[*]")
.getOrCreate()
// Wczytanie CSV bez inferSchema, tylko header i separator
val rawDf = spark.read
.option("header", "true")
.option("sep", ";")
.csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")
// Określenie typów dla wybranych kolumn
val typedDf = rawDf
.withColumn("customer_number", rawDf("customer_number").cast(IntegerType))
.withColumn("sales_amount", rawDf("sales_amount").cast(DoubleType))
// Dodaj więcej kolumn, które chcesz jawnie przekonwertować
// Pozostałe kolumny pozostaną StringType automatycznie (bo Spark wczytał je jako string)
typedDf.printSchema()
// Zapis do Parquet
typedDf.coalesce(2) // zmniejszamy liczbę plików
.write
.mode("overwrite")
.parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")
Podgląd plików parquet
Możesz oczywiście podejrzeć pliki parquet w Spark. Możesz to także zrobić używając GUI; ParqueViewer