Czym są pliki w formacie Apache Parquet

Apache Parquet to format plików danych typu open source, zorientowany na kolumny, zaprojektowany z myślą o efektywnym przechowywaniu i pobieraniu danych. Oferuje wydajne schematy kompresji i kodowania, umożliwiające obsługę złożonych danych w dużych ilościach i jest obsługiwany przez wiele języków programowania oraz narzędzi analitycznych. Pełny opis formatu parquet oraz dokumentacja znajduje sie na stronie https://parquet.apache.org/

Czym się różni format CSV of formatu Parquet?

Główna różnica między plikami Parquet a CSV polega na sposobie przechowywania danych i efektywności: CSV to tekstowy format wierszowy, łatwy do odczytu przez człowieka, ale wolny przy dużych plikach, bez typów danych i zwykle większy.

Parquet to binarny format kolumnowy, wspierający kompresję i typy danych, umożliwiający szybki odczyt tylko potrzebnych kolumn i bardziej wydajny przy analizie dużych zbiorów danych.

CSV

Plik zwykle zaczyna się od nagłówka (pierwszego wiersza z nazwami kolumn), a kolejne wiersze zawierają dane, gdzie kolumny są oddzielone określonym separatorem, najczęściej przecinkiem, czasem średnikiem lub tabulatorem. Nagłówek nie jest obowiązkowy - dodawany jest tylko ze względu na łatwiejszy odczyt pliku. Oto przykład pliku tekstowego, CSV:

Parquet

Plik Parquet ma strukturę kolumnową i binarną: zamiast wierszy z separatorami, dane każdej kolumny są przechowywane razem w tzw. column chunks, a wiele takich bloków tworzy row groups. Plik zawiera także metadane opisujące schemat danych i typy kolumn, co umożliwia szybki odczyt wybranych kolumn. Nie ma tradycyjnego nagłówka ani separatorów – kolejność elementów w kolumnach odpowiada wierszom, dzięki czemu system wie, które wartości z różnych kolumn tworzą razem wiersz. Z naszego CSV powyżej powstały trzy kolumny.

Ale to nie wszystko. Dane są dzielone na tak zwane 'row groups' - bloki wierszy (np. po 50 tysięcy wierszy). Każdy taki zestaw wierszy zawiera 'chunki' (bloki kolumn) w których przechowywane są dane kolumn. Dane w plikach Parquet dzieli się na row groups, żeby szybciej wczytywać tylko potrzebne fragmenty, lepiej kompresować dane, przetwarzać je równolegle i efektywnie zarządzać pamięcią. Gdybyśmy mieli dwie row groups dla informacji z naszego CSV, jedna przechowywałaby pierwsze dwa wiersze a druga dwa następne:

Jedna kolumna jest przechowywana zawsze tylko w jednym 'chunk' na poziomie row group. Każdy row group ma chunki dla wszystkich kolumn. Prawdzia struktura pliku parquet jest jeszcze bardziej złożona; np. każdy z bloków kolumn ma jeszcze strony (pages) a w pliku znajdują się metadata, statystyki.. Oto schemat struktury pliku:

Plik Parquet
├─ File MetaData (schemat tabeli, liczba row groups, statystyki globalne)

├─ Row Group 0
│  ├─ Column Chunk (kolumna A)
│  │  ├─ Page Header (statystyki strony, typ kompresji, kodowanie)
│  │  ├─ Page 1 (dane zakodowane i skompresowane)
│  │  ├─ Page 2 (dane zakodowane i skompresowane)
│  │  └─ Page 3 (...)
│  │
│  ├─ Column Chunk (kolumna B)
│  │  ├─ Page Header
│  │  ├─ Page 1 (...)
│  │  └─ Page 2 (...)
│  │
│  └─ Column Chunk (kolumna C)
│     ├─ Page Header
│     ├─ Page 1 (...)
│     └─ Page 2 (...)

├─ Row Group 1
│  └─ Column Chunk (kolumny A, B, C analogicznie jak wyżej)

└─ File Footer
   ├─ Informacje o row groups (offsety w pliku, liczba wierszy)
   ├─ Statystyki kolumn (np. min/max wartości)
   └─ Magic bytes ("PAR1" na początku i końcu pliku)

Pisząc aplikacje w Spark nie schodzisz jednak do tego poziomu więc objaśnianie tych elementów nie jest ważne dla pracy z plikami parquet.

Zapisywanie do formatu parquet

Jednym z najlepszych narzędzi do tworzenia plików w formacie parquet jest sam Spark. Oto prosty kod jak odczytać nasz testowy plik z wcześniejszego artykułu 'wprowadzenie do Apache Spark: uruchamianie zadań'.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("CSV to Parquet")
  .master("local[*]")
  .getOrCreate()
  
val startTime = System.currentTimeMillis()

val df = spark.read
  .option("header", "true")   // jeśli CSV ma nagłówki
  .option("inferSchema", "true") // Spark sam rozpozna typy
  .option("sep", ";")             // określamy separator
  .csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")

// Zapis do Parquet
df.write
  .mode("overwrite")
.parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")
  
val endTime = System.currentTimeMillis()
println(f"Czas wykonania: ${(endTime - startTime)/1000.0}%.2f s")

Rezultat wykonania tego kodu znajdziesz w katalogu 'sales_data_parquet'. Nie znajdziesz tam tylko jednego pliku ale nie mniej niż cztery:

Co jest w katalogu salesdata_parquet

  1. .parquet → właściwe dane (1 partycja = 1 plik)

  2. .crc → pliki kontrolne (checksum) → potrzebne Sparkom/HDFS do sprawdzenia integralności danych

  3. _SUCCESS → plik „znacznikowy”, informuje, że zapis się zakończył poprawnie

Nie używasz ich bezpośrednio – większość narzędzi (Spark, Hive, Presto, Pandas, Tableau) ignoruje _SUCCESS i .crc, czyta tylko .parquet. Zatem nie musisz czyścić tego katalogu usuwając inne pliki niż .parquet

Pliki te, inne niz .parquet zajmują niewielem miejsca:

  • _SUCCESS

    • Ma 0 bajtów (lub czasem minimalny rozmiar) → praktycznie nie zajmuje miejsca.

    • Informuje systemy rozproszone (Spark/Hadoop/HDFS), że zapis zakończył się poprawnie.

    • Jest przydatny, jeśli np. kolejny proces (Airflow, Spark, Hive) ma sprawdzić, czy katalog jest kompletny.

  • .crc

    • Checksum plików → używane w HDFS i niektórych systemach lokalnych do sprawdzania integralności danych.

    • Rozmiar zwykle bardzo mały (kilka KB na plik) w porównaniu do danych.

    • automatycznie generowane i ignorowane przez narzędzia do analizy danych (Pandas, Spark, Presto, Hive).

Masz jednak wpływ na to ile plików .parquet powstawnie w wyniku wykonania zadania. Możesz w tym celu użyć jednej z dwóch opcji; repartion lub coalesce:

repartition(N)

  • Działa na całym DataFrame.

  • Robi pełny shuffle danych: Spark analizuje wszystkie wiersze i przepakowuje je równomiernie do N partycji.

  • Skutek: każda partycja ma mniej więcej tyle samo danych → pliki Parquet będą podobnej wielkości.

  • Koszt: shuffle = dużo kopiowania danych między workerami → wolniejsze, ale dokładne.

coalesce(N)

  • Nie robi shuffle.

  • Działa „w locie”, scala istniejące partycje w mniejszą liczbę.

  • Skutek: liczba partycji zmniejszona, ale dane mogą być nierównomiernie rozłożone → nie wszystkie pliki będą tej samej wielkości.

  • Koszt: znacznie mniejszy niż repartition, bo nie trzeba przesyłać danych między workerami.

Oto przykład użycia opcji repartition:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("CSV to Parquet")
  .master("local[*]")
  .getOrCreate()
  
val startTime = System.currentTimeMillis()

val df = spark.read
  .option("header", "true")   // jeśli CSV ma nagłówki
  .option("inferSchema", "true") // Spark sam rozpozna typy
  .option("sep", "|")             // określamy separator
.csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")
  
// Ustawienie liczby partycji i przypisanie do nowego DataFrame
val df2 = df.repartition(2)  // teraz df2 ma 2 partycje

// Zapis do Parquet
df2.write
  .mode("overwrite")
.parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")
  
val endTime = System.currentTimeMillis()
println(f"Czas wykonania: ${(endTime - startTime)/1000.0}%.2f s")

Chcemy jednak docelowo wykorzystać potencjał formatu parquet - nie tylko przechowywanie danych kolumnowo ale też przechowywanie w plików informacji o typach danych. Zmodyfikujmy zatem nasz kod by przy zapisie konwertował numer klienta do integer a sumę sprzedaży do double. Spark daje nam możliwość pozostawiania domyślnego typu danych (string) dla pozostałych kolumn:

import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("CSV to Parquet")
  .master("local[*]")
  .getOrCreate()

// Wczytanie CSV bez inferSchema, tylko header i separator
val rawDf = spark.read
  .option("header", "true")
  .option("sep", ";")
  .csv("/mnt/g/ApacheSpark/Data/Sales/sales_data.csv")

// Określenie typów dla wybranych kolumn
val typedDf = rawDf
  .withColumn("customer_number", rawDf("customer_number").cast(IntegerType))
  .withColumn("sales_amount", rawDf("sales_amount").cast(DoubleType))
  // Dodaj więcej kolumn, które chcesz jawnie przekonwertować

// Pozostałe kolumny pozostaną StringType automatycznie (bo Spark wczytał je jako string)
typedDf.printSchema()

// Zapis do Parquet
typedDf.coalesce(2)  // zmniejszamy liczbę plików
  .write
  .mode("overwrite")
  .parquet("/mnt/g/ApacheSpark/Data/sales_data_parquet")

Podgląd plików parquet

Możesz oczywiście podejrzeć pliki parquet w Spark. Możesz to także zrobić używając GUI; ParqueViewer