Wprowadzenie do Apache Spark: uruchamianie zadań

Apache Spark to oprogramowanie, które pozwala sprawnie przetwarzać i analizować duże zbiory danych (naprawdę duże). Jest szybkie (bardzo szybkie!), skalowalne i wspiera różne języki programowania, dzięki czemu można go używać zarówno do prostych analiz, jak i bardziej zaawansowanych zadań. Spark ułatwia automatyzację pracy z danymi i pozwala szybciej uzyskać potrzebne wyniki.

Apache Spark może działać nie tylko na wydajnych serwerach - możesz je mieć nawet na swoim własnym laptopie i ten laptop może posłużyć Ci do całkiem zaawansowanych analiz. Dlatego warto zapoznać się z możliwościami tego oprogramowania.

Dwa sposoby uruchamianie zadań

Pomijamy tutaj uruchamianie skryptów pythona napisanych w pyspark (jeśli zaistalujesz Spark jako moduł pyspark do pythona). Moduł PySpark zainstalowany przez pip działa w trybie lokalnym i uruchamia Spark tylko na pojedynczym komputerze, pod pełną kontrolą Pythona, bez możliwości użycia języka Scala ani pełnego rozproszonego przetwarzania. Dlatego sprawdza się głównie do testów, nauki i eksperymentów, ale nie do przetwarzania dużych zbiorów danych w produkcji.

spark-shell (pyspark)

Spark-shell stosuje się, gdy chcemy interaktywnie testować i eksplorować dane w Sparku przy użyciu Scali, bez konieczności pisania pełnej aplikacji. Jest idealny do szybkiego prototypowania, nauki Sparka i eksperymentów z operacjami na RDD (Resilient Distributed Dataset) czy DataFrame.

Nasz plik z danymi do testów ma strukturę (pobierz plik):

customer_number;sales_amount;sales_date
9;4089.54;2024-12-13
18;6579.65;2025-04-19
20;6005.84;2024-08-02
7;2026.39;2024-12-28
20;6870.04;2024-01-12
17;4454.97;2025-04-10
5;1116.2;2024-07-28
15;1463.37;2025-04-13
3;1229.04;2024-08-20
1;2397.96;2024-06-08
5;4005.88;2025-02-04
8;8435.51;2024-03-15
16;6749.28;2023-10-22
3;9665.37;2024-10-26
4;5635.03;2025-09-14
1;8108.14;2024-09-29
5;1522.14;2025-06-15
11;9610.69;2025-01-24
3;6675.06;2025-06-11
2;4545.24;2025-05-30

Plik zapisz do dowolnego katalogu na swoim PC. Środowisko WSL umożliwia odczyt plików  z katalogów Windows jakby to był ten sam system operacyjny. Jeśli katalogiem w którym masz ten plik jest dysk G:, pełna ścieżka 'G:\ApacheSpark\Data', dla WSL będzie to scieżka '/mnt/g/ApacheSpark/Data/'. Sprawdź to wpisując w shell'u WSL następujące polecenie:

ls /mnt/g/ApacheSpark/Data/

W ten sposób otrzymasz listę plików które znajdują się w katalogu. Jeśli masz już uruchomione okno WSL (poprzez Powershell), uruchom shell Spark wpisując spark-shell. Kiedy uruchamiasz spark-shell, Spark automatycznie włącza REPL (Read-Eval-Print Loop) dla Scali. Spark-shell jest REPL-em tylko języka Scala. Jeśli masz kod Python'a, uruchom REPL który obsługuje skrypty pyspark wpisujac pyspark w shellu WSL.

Zróbmy pierwsze ćwieczenie - odczytajmy pierwszych 10 wierszy z powyższego pliku wklejając kod napisany w Scala bezpośrednio do okna spark-shell:

// Utworzenie SparkSession
val spark = org.apache.spark.sql.SparkSession.builder()
  .appName("Odczyt CSV")
  .getOrCreate()

// Ścieżka do pliku w WSL
val plik_csv = "/mnt/g/ApacheSpark/Data/sales_data.csv"

// Wczytanie CSV z separatorem ';' i nagłówkiem
val df = spark.read
  .option("header", "true")
  .option("sep", ";")
  .csv(plik_csv)

// Wyświetlenie pierwszych 10 wierszy
df.show(10)

// Zakończenie sesji Spark
spark.stop()

W rezultacie zobaczysz na ekranie w formie tabeli pierwszych 10 wierszy z pliku. Spark-shell będzie też domyślną metoda by wykonywać skrypt z notebooków (Jupyter / Zeppelin).

Możesz jednak napotkać problemy z wykonaniem kodu wklejanego w terminalu - każda linia kodu może w niezamieżony sposób uruchamiać się natychmiast po wklejeniu. By temu zapobiec, uruchom tryb :paste w REPL-u dla Scala (spark-shell). Po uruchomieniu tego trybu wklej blok kody a na koniec wybierz Ctrl+D - następi wykonanie bloku kodu.

Python nie posiada odpowiednika trybu :paste ponieważ Python radzi sobie z wykonywaniem kodu linia po linii.

spark-submit

W środowiskach produkcyjnych najczęściej używa się polecenia spark-submit do uruchamiania zadań batch. Daje ono pełną kontrolę nad konfiguracją przetwarzania i umożliwia łatwą integrację z harmonogramami, takimi jak cron czy Airflow. Dzięki temu można wygodnie skalować obliczenia na wiele węzłów i wykorzystywać Sparka do przetwarzania dużych zbiorów danych, zadań ETL, raportowania czy agregacji.

Na poziomie WSL przejdz do katalogu w ktorym będziesz przechowywać zadania dla Spark, np:

cd /mnt/g/ApacheSpark/Zadania

Jeśli chcesz tworzyć zadania w jezyku Scala, przed uruchomieniem zadania (wysłaniem go do Spark poleceniem spark-submit) musisz taki kod skompilować. Zadania napisane w języku Scala będę szybsze dla dużej ilości danych, lepsza integracja z JVM, błedy w kodzie wykrywane na poziomie kompilacji, najnowsze opcje dostępne wcześniej w Scala. Z kolei Python daje dużą łatwość pisania kodu, lepszą integracje z ekosystemem narzędzi analiztycznych, szybsze pisanie kodu (brak kroków związanych z kompilacją). Jeśli masz naprawdę duże zbiory danych, lepiej będzie używać języka Scala. W innym przypadku Python jest więcej niż wystarczający.

PySpark

Jeśli Twoją decyzją jest zastosowanie języka Python - użyjesz interfejsu pyspark. Napiszmy zatem testowe zadanie w Python - zsumujmy sprzedaż wg firmy i wyświetlmy 20 topowych rezultatów (plik z danymi jest w katalogu '/mnt/g/ApacheSpark/Data/Sales' - jeśli będzie tam więcej plików, odczytane zostaną wszystkie), zapiszmy je w pliku sum_sales.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round, format_number
import time
import os

def main():
    # Utworzenie SparkSession
    spark = SparkSession.builder \
        .appName("SumSalesByCustomer") \
        .getOrCreate()
    
    # Ustawienie logów na ERROR, żeby nie wyświetlało ostrzeżeń
    spark.sparkContext.setLogLevel("ERROR")

    # Mierzymy czas - start
    start_time = time.time()

    # Ścieżka do katalogu z plikami CSV
  data_dir = "/mnt/g/ApacheSpark/Data/Sales"


    df = spark.read \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("mode", "DROPMALFORMED") \
    .csv(f"file://{data_dir}/*")


    # Sprawdzenie schematu
    print("Schema danych:")
    df.printSchema()


    # Agregacja: suma sprzedaży po customer_number
    result = df.groupBy("customer_number") \
               .agg(sum(col("sales_amount")).alias("total_sales")) \
               .withColumn("total_sales", round(col("total_sales"), 2))

    print("Wyniki agregacji (top 20):")
    result.orderBy(col("total_sales").desc()).show(20, truncate=False)

    # Czas wykonania
    end_time = time.time()
    print(f"Czas wykonania: {end_time - start_time:.2f} s")

    spark.stop()

if __name__ == "__main__":
    main()

Mamy kod, uruchamiamy wykonanie tego zadania:

spark-submit sum_sales.py

Spark-submit pozwala uruchamiać programy (Scala JAR lub Python .py) z dodatkowymi parametrami, które kontrolują zarówno sposób działania Sparka, jak i zachowanie Twojego programu. Więcej o nich i argumentach przesyłanych do programów w dalszej części artykułu.

Spark potrafi odczytać bardzo szybko duże ilości danych z plików ale co bardziej niesamowite, możemy te dane traktować jakbyśmy odczytywali je z bazy danych.  Jednak typy danych mają znaczenie. Poniżej kolejna edycja kodu, tym razem deklarujemy typy danych dla kolumn i informujemy spark by uwzględnił Schema przy odczycie pliku (.schema w sekcji df = spark.read):

    # Ścieżka do katalogu z plikami CSV
  data_dir = "/mnt/g/ApacheSpark/Data/Sales"
    
    schema = StructType([
        StructField("customer_number", IntegerType(), True),
        StructField("sales_amount", DoubleType(), True),
        StructField("sales_date", StringType(), True)
    ])

Możesz zauważyć że wykonanie tego zadania zajęło mniej czasu - typy danych mają znaczenie.

Scala

Rozpoczynany od przygotwania kodu w jezyku Scala. Mozesz rozpocząć od sprawdzenia poniższego kodu w sparku uruchamiając spark-shell i wklejając kod:

Wykonanie tego zadania powinno zwrócić to samo co wcześniej przy użyciu kodu Python; 20 pierwszych firm z najwiekszymi obrotami.

Jeśli nasz kod działa, przygotujemy skompilowaną aplikację Scala; plik JAR. Pierwszy krok -  kod opakowowujemy w obiekt Scala (SumSalesByCustomer) z metodą main:

Mamy kod programu, tworzymy strukturę katalogów dla naszego builda. 'SparkSalesApp' to przykładowa nazwa naszego projektu, możesz nadać inną nazwę. SumSalesByCustomer.scala to plik który zawiera kod naszej aplikacji; jak wyżej.

SparkSalesApp/
 ├─ build.sbt
 └─ src/
     └─ main/
         └─ scala/
             └─ SumSalesByCustomer.scala

Taką strukturę katalogów możesz utworzyć w Exploratorze Windows by nie męczyć się w shell'u Linux. W tych przykładach lokalizacja plików to '/mnt/g/ApacheSpark/' więc możesz utworzyć katalog '/mnt/g/ApacheSpark/Apps/SparkSalesApp

Potrzebujesz teraz utworzyc plik build.sbt:

name := "SparkSalesApp"
version := "0.1"
scalaVersion := "2.13.16"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "4.0.1",
"org.apache.spark" %% "spark-sql"  % "4.0.1"
)

Skąd wiesz jaką wersję Spark masz i jaką wersję Scala? Jeśli wykonasz polecenie spark-shell w terminalu WSL, powitalny ekran Sparka wyświetla informacje o wersji Scala i Spark:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.1
      /_/

Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.16)

Potrzebujemy teraz narzędzie build. Użyjemy sbt - a simple build tool. SBT nie jest w domyślnych repozytoriach Debiana. Trzeba dodać oficjalne repozytorium SBT:

echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list

Następnie zaktualizować listę pakietów:

sudo apt update

Teraz możesz zainstalować sbt:

sudo apt install sbt

OK, generujemy JAR. Wejdź do katalogu projektu (poziom 'SparkSalesApp') i wykonaj polecenie:

sbt package

Wynikiem kompilacji będzie gotwa do użycia aplikacja Spark; plik JAR w katalogu 'Apps\SparkSalesApp\target\scala-2.13' - sparksalesapp_2.13-0.1.jar JAR jest samodzielny, nie potrzebujesz żadnych innych plików by uruchomić aplikację którą właśnie stworzyliśmy.

OK, mamy gotową aplikację Spark, możemy ją uruchomić poleceniem spark-submit. Utórzmy katalog w którym będziemy przechowywać gotowe do uruchomienia aplikacje, np: 'ApacheSpark\Run' i skopiujmy tam plik JAR - sparksalesapp_2.13-0.1.jar

Uruchamiamy aplikację

Przechodzimy w shell'u WSL do katalogu 'ApacheSpark\Run' i uruchamiamy JAR poleceniem:

spark-submit --class SumSalesByCustomer --master local[*] sparksalesapp_2.13-0.1.jar

Co robi to polecenie?

  • --class SumSalesByCustomer

    • Określa, który obiekt/klasę w JAR-ie zawiera metodę main, czyli punkt startowy programu.

    • Bez tego Spark nie wie, który kod uruchomić, jeśli JAR ma więcej niż jedną klasę.

  • --master local[*]

    • Określa środowisko uruchomienia: tutaj lokalny Spark, używający wszystkich rdzeni (*).

    • Można to pominąć, jeśli masz w spark-defaults.conf ustawione domyślne środowisko.

  • sparksalesapp_2.13-0.1.jar

    • Ścieżka do JAR-a, czyli Twój skompilowany projekt.

W realnej aplikacji Spark, scieżki do plików byłyby raczej przesyłane jako argumenty. Np katalog wejściowy i wyjściowy:

spark-submit \
  --class SumSalesByCustomer \
  --master local[*] \
  sparksalesapp_2.13-0.1.jar \
  /mnt/g/ApacheSpark/Data/Sales \
  /mnt/g/ApacheSpark/Results

I odbierane w aplikacji:

 val inputPath = args(0)   // pierwszy argument
val outputPath = args(1)  // drugi argument

Voila, dowiedział(a)eś się właśnie jak uruchamiać zadania w Apache Spark!