Wprowadzenie do Apache Spark: uruchamianie zadań
Apache Spark to oprogramowanie, które pozwala sprawnie przetwarzać i analizować duże zbiory danych (naprawdę duże). Jest szybkie (bardzo szybkie!), skalowalne i wspiera różne języki programowania, dzięki czemu można go używać zarówno do prostych analiz, jak i bardziej zaawansowanych zadań. Spark ułatwia automatyzację pracy z danymi i pozwala szybciej uzyskać potrzebne wyniki.
Apache Spark może działać nie tylko na wydajnych serwerach - możesz je mieć nawet na swoim własnym laptopie i ten laptop może posłużyć Ci do całkiem zaawansowanych analiz. Dlatego warto zapoznać się z możliwościami tego oprogramowania.
Dwa sposoby uruchamianie zadań
Pomijamy tutaj uruchamianie skryptów pythona napisanych w pyspark (jeśli zaistalujesz Spark jako moduł pyspark do pythona). Moduł PySpark zainstalowany przez pip działa w trybie lokalnym i uruchamia Spark tylko na pojedynczym komputerze, pod pełną kontrolą Pythona, bez możliwości użycia języka Scala ani pełnego rozproszonego przetwarzania. Dlatego sprawdza się głównie do testów, nauki i eksperymentów, ale nie do przetwarzania dużych zbiorów danych w produkcji.
spark-shell (pyspark)
Spark-shell stosuje się, gdy chcemy interaktywnie testować i eksplorować dane w Sparku przy użyciu Scali, bez konieczności pisania pełnej aplikacji. Jest idealny do szybkiego prototypowania, nauki Sparka i eksperymentów z operacjami na RDD (Resilient Distributed Dataset) czy DataFrame.
Nasz plik z danymi do testów ma strukturę (pobierz plik):
customer_number;sales_amount;sales_date
9;4089.54;2024-12-13
18;6579.65;2025-04-19
20;6005.84;2024-08-02
7;2026.39;2024-12-28
20;6870.04;2024-01-12
17;4454.97;2025-04-10
5;1116.2;2024-07-28
15;1463.37;2025-04-13
3;1229.04;2024-08-20
1;2397.96;2024-06-08
5;4005.88;2025-02-04
8;8435.51;2024-03-15
16;6749.28;2023-10-22
3;9665.37;2024-10-26
4;5635.03;2025-09-14
1;8108.14;2024-09-29
5;1522.14;2025-06-15
11;9610.69;2025-01-24
3;6675.06;2025-06-11
2;4545.24;2025-05-30
Plik zapisz do dowolnego katalogu na swoim PC. Środowisko WSL umożliwia odczyt plików z katalogów Windows jakby to był ten sam system operacyjny. Jeśli katalogiem w którym masz ten plik jest dysk G:, pełna ścieżka 'G:\ApacheSpark\Data', dla WSL będzie to scieżka '/mnt/g/ApacheSpark/Data/'. Sprawdź to wpisując w shell'u WSL następujące polecenie:
ls /mnt/g/ApacheSpark/Data/
W ten sposób otrzymasz listę plików które znajdują się w katalogu. Jeśli masz już uruchomione okno WSL (poprzez Powershell), uruchom shell Spark wpisując spark-shell. Kiedy uruchamiasz spark-shell, Spark automatycznie włącza REPL (Read-Eval-Print Loop) dla Scali. Spark-shell jest REPL-em tylko języka Scala. Jeśli masz kod Python'a, uruchom REPL który obsługuje skrypty pyspark wpisujac pyspark w shellu WSL.
Zróbmy pierwsze ćwieczenie - odczytajmy pierwszych 10 wierszy z powyższego pliku wklejając kod napisany w Scala bezpośrednio do okna spark-shell:
// Utworzenie SparkSession
val spark = org.apache.spark.sql.SparkSession.builder()
.appName("Odczyt CSV")
.getOrCreate()
// Ścieżka do pliku w WSL
val plik_csv = "/mnt/g/ApacheSpark/Data/sales_data.csv"
// Wczytanie CSV z separatorem ';' i nagłówkiem
val df = spark.read
.option("header", "true")
.option("sep", ";")
.csv(plik_csv)
// Wyświetlenie pierwszych 10 wierszy
df.show(10)
// Zakończenie sesji Spark
spark.stop()
W rezultacie zobaczysz na ekranie w formie tabeli pierwszych 10 wierszy z pliku. Spark-shell będzie też domyślną metoda by wykonywać skrypt z notebooków (Jupyter / Zeppelin).
Możesz jednak napotkać problemy z wykonaniem kodu wklejanego w terminalu - każda linia kodu może w niezamieżony sposób uruchamiać się natychmiast po wklejeniu. By temu zapobiec, uruchom tryb :paste w REPL-u dla Scala (spark-shell). Po uruchomieniu tego trybu wklej blok kody a na koniec wybierz Ctrl+D - następi wykonanie bloku kodu.
Python nie posiada odpowiednika trybu :paste ponieważ Python radzi sobie z wykonywaniem kodu linia po linii.
spark-submit
W środowiskach produkcyjnych najczęściej używa się polecenia spark-submit do uruchamiania zadań batch. Daje ono pełną kontrolę nad konfiguracją przetwarzania i umożliwia łatwą integrację z harmonogramami, takimi jak cron czy Airflow. Dzięki temu można wygodnie skalować obliczenia na wiele węzłów i wykorzystywać Sparka do przetwarzania dużych zbiorów danych, zadań ETL, raportowania czy agregacji.
Na poziomie WSL przejdz do katalogu w ktorym będziesz przechowywać zadania dla Spark, np:
cd /mnt/g/ApacheSpark/Zadania
Jeśli chcesz tworzyć zadania w jezyku Scala, przed uruchomieniem zadania (wysłaniem go do Spark poleceniem spark-submit) musisz taki kod skompilować. Zadania napisane w języku Scala będę szybsze dla dużej ilości danych, lepsza integracja z JVM, błedy w kodzie wykrywane na poziomie kompilacji, najnowsze opcje dostępne wcześniej w Scala. Z kolei Python daje dużą łatwość pisania kodu, lepszą integracje z ekosystemem narzędzi analiztycznych, szybsze pisanie kodu (brak kroków związanych z kompilacją). Jeśli masz naprawdę duże zbiory danych, lepiej będzie używać języka Scala. W innym przypadku Python jest więcej niż wystarczający.
PySpark
Jeśli Twoją decyzją jest zastosowanie języka Python - użyjesz interfejsu pyspark. Napiszmy zatem testowe zadanie w Python - zsumujmy sprzedaż wg firmy i wyświetlmy 20 topowych rezultatów (plik z danymi jest w katalogu '/mnt/g/ApacheSpark/Data/Sales' - jeśli będzie tam więcej plików, odczytane zostaną wszystkie), zapiszmy je w pliku sum_sales.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round, format_number
import time
import os
def main():
# Utworzenie SparkSession
spark = SparkSession.builder \
.appName("SumSalesByCustomer") \
.getOrCreate()
# Ustawienie logów na ERROR, żeby nie wyświetlało ostrzeżeń
spark.sparkContext.setLogLevel("ERROR")
# Mierzymy czas - start
start_time = time.time()
# Ścieżka do katalogu z plikami CSV
data_dir = "/mnt/g/ApacheSpark/Data/Sales"
df = spark.read \
.option("header", "true") \
.option("delimiter", ";") \
.option("mode", "DROPMALFORMED") \
.csv(f"file://{data_dir}/*")
# Sprawdzenie schematu
print("Schema danych:")
df.printSchema()
# Agregacja: suma sprzedaży po customer_number
result = df.groupBy("customer_number") \
.agg(sum(col("sales_amount")).alias("total_sales")) \
.withColumn("total_sales", round(col("total_sales"), 2))
print("Wyniki agregacji (top 20):")
result.orderBy(col("total_sales").desc()).show(20, truncate=False)
# Czas wykonania
end_time = time.time()
print(f"Czas wykonania: {end_time - start_time:.2f} s")
spark.stop()
if __name__ == "__main__":
main()
Mamy kod, uruchamiamy wykonanie tego zadania:
spark-submit sum_sales.py
Spark-submit pozwala uruchamiać programy (Scala JAR lub Python .py) z dodatkowymi parametrami, które kontrolują zarówno sposób działania Sparka, jak i zachowanie Twojego programu. Więcej o nich i argumentach przesyłanych do programów w dalszej części artykułu.
Spark potrafi odczytać bardzo szybko duże ilości danych z plików ale co bardziej niesamowite, możemy te dane traktować jakbyśmy odczytywali je z bazy danych. Jednak typy danych mają znaczenie. Poniżej kolejna edycja kodu, tym razem deklarujemy typy danych dla kolumn i informujemy spark by uwzględnił Schema przy odczycie pliku (.schema w sekcji df = spark.read):
# Ścieżka do katalogu z plikami CSV
data_dir = "/mnt/g/ApacheSpark/Data/Sales"
schema = StructType([
StructField("customer_number", IntegerType(), True),
StructField("sales_amount", DoubleType(), True),
StructField("sales_date", StringType(), True)
])
Możesz zauważyć że wykonanie tego zadania zajęło mniej czasu - typy danych mają znaczenie.
Scala
Rozpoczynany od przygotwania kodu w jezyku Scala. Mozesz rozpocząć od sprawdzenia poniższego kodu w sparku uruchamiając spark-shell i wklejając kod:

Wykonanie tego zadania powinno zwrócić to samo co wcześniej przy użyciu kodu Python; 20 pierwszych firm z najwiekszymi obrotami.
Jeśli nasz kod działa, przygotujemy skompilowaną aplikację Scala; plik JAR. Pierwszy krok - kod opakowowujemy w obiekt Scala (SumSalesByCustomer) z metodą main:

Mamy kod programu, tworzymy strukturę katalogów dla naszego builda. 'SparkSalesApp' to przykładowa nazwa naszego projektu, możesz nadać inną nazwę. SumSalesByCustomer.scala to plik który zawiera kod naszej aplikacji; jak wyżej.
SparkSalesApp/
├─ build.sbt
└─ src/
└─ main/
└─ scala/
└─ SumSalesByCustomer.scala
Taką strukturę katalogów możesz utworzyć w Exploratorze Windows by nie męczyć się w shell'u Linux. W tych przykładach lokalizacja plików to '/mnt/g/ApacheSpark/' więc możesz utworzyć katalog '/mnt/g/ApacheSpark/Apps/SparkSalesApp'
Potrzebujesz teraz utworzyc plik build.sbt:
name := "SparkSalesApp"
version := "0.1"
scalaVersion := "2.13.16"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "4.0.1",
"org.apache.spark" %% "spark-sql" % "4.0.1"
)
Skąd wiesz jaką wersję Spark masz i jaką wersję Scala? Jeśli wykonasz polecenie spark-shell w terminalu WSL, powitalny ekran Sparka wyświetla informacje o wersji Scala i Spark:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.1
/_/
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.16)
Potrzebujemy teraz narzędzie build. Użyjemy sbt - a simple build tool. SBT nie jest w domyślnych repozytoriach Debiana. Trzeba dodać oficjalne repozytorium SBT:
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
Następnie zaktualizować listę pakietów:
sudo apt update
Teraz możesz zainstalować sbt:
sudo apt install sbt
OK, generujemy JAR. Wejdź do katalogu projektu (poziom 'SparkSalesApp') i wykonaj polecenie:
sbt package
Wynikiem kompilacji będzie gotwa do użycia aplikacja Spark; plik JAR w katalogu 'Apps\SparkSalesApp\target\scala-2.13' - sparksalesapp_2.13-0.1.jar JAR jest samodzielny, nie potrzebujesz żadnych innych plików by uruchomić aplikację którą właśnie stworzyliśmy.
OK, mamy gotową aplikację Spark, możemy ją uruchomić poleceniem spark-submit. Utórzmy katalog w którym będziemy przechowywać gotowe do uruchomienia aplikacje, np: 'ApacheSpark\Run' i skopiujmy tam plik JAR - sparksalesapp_2.13-0.1.jar
Uruchamiamy aplikację
Przechodzimy w shell'u WSL do katalogu 'ApacheSpark\Run' i uruchamiamy JAR poleceniem:
spark-submit --class SumSalesByCustomer --master local[*] sparksalesapp_2.13-0.1.jar
Co robi to polecenie?
-
--class SumSalesByCustomer-
Określa, który obiekt/klasę w JAR-ie zawiera metodę
main, czyli punkt startowy programu. -
Bez tego Spark nie wie, który kod uruchomić, jeśli JAR ma więcej niż jedną klasę.
-
-
--master local[*]-
Określa środowisko uruchomienia: tutaj lokalny Spark, używający wszystkich rdzeni (
*). -
Można to pominąć, jeśli masz w
spark-defaults.confustawione domyślne środowisko.
-
-
sparksalesapp_2.13-0.1.jar-
Ścieżka do JAR-a, czyli Twój skompilowany projekt.
-
W realnej aplikacji Spark, scieżki do plików byłyby raczej przesyłane jako argumenty. Np katalog wejściowy i wyjściowy:
spark-submit \
--class SumSalesByCustomer \
--master local[*] \
sparksalesapp_2.13-0.1.jar \
/mnt/g/ApacheSpark/Data/Sales \
/mnt/g/ApacheSpark/Results
I odbierane w aplikacji:
val inputPath = args(0) // pierwszy argument
val outputPath = args(1) // drugi argument
Voila, dowiedział(a)eś się właśnie jak uruchamiać zadania w Apache Spark!