Jak działa Apache Spark w środowisku Kubernetes - uruchamianie zadań
W realnym, produkcyjnym scenariuszu użycia Spark w klastrze nie rezyduje on tam na stałe. Jego komponenty są instalowane dynamicznie wraz ze zlecaniem zadania i są usuwane po wykonaniu pracy.
Polecenie spark-submit jest używane do przesłania aplikacji (zadań) Spark do klastra Kubernetes. Wymagana ne jest byś miał(a) oprogramownie Spark lokalnie - twój lokalny Spark jest klientem.
Mechanizm przesyłania działa w następujący sposób:
-
Spark tworzy proces drivera (Spark driver), który uruchamia się wewnątrz poda Kubernetes.
-
Driver tworzy procesy executorów, które również działają jako pody w Kubernetes, łączy się z nimi i wykonuje kod aplikacji.
-
Gdy aplikacja zakończy działanie, pody executorów są zatrzymywane i usuwane, natomiast pod drivera zachowuje logi i pozostaje w stanie „completed” (zakończony) w API Kubernetes, dopóki nie zostanie usunięty ręcznie lub przez mechanizm automatycznego czyszczenia (garbage collection).
Należy zauważyć, że w stanie „completed” pod drivera nie zużywa żadnych zasobów obliczeniowych ani pamięci.

Planowaniem (ang. scheduling) podów drivera i executorów zajmuje się Kubernetes.
Komunikacja z API Kubernetes odbywa się przez bibliotekę fabric8.
Możliwe jest ograniczenie uruchamiania podów drivera i executorów do określonej grupy węzłów przy użyciu node selector — konfiguracji określającej, na których nodach mają się uruchamiać.
W przyszłych wersjach możliwe będzie korzystanie z bardziej zaawansowanych wskazówek dotyczących planowania, takich jak afinity węzłów lub podów (node/pod affinities).
Połączenie z Kubernetes
Poniższa instrukcja jest prawdziwa jeśli Twój używasz K3s a do zarządzania nim użyłeś instalacji Rancher. Jeśli masz więc Kubernetes K3s zainstalowany razem z Rancher, ma on swoją wbudowaną ścieżkę certyfikatów i uproszczony kube-apiserver.
K3s, jak i wersja K8, ma wbudowany containerd, więc nie używa Dockera. Containerd to lekki i niskopoziomowy runtime kontenerów — czyli program, który faktycznie uruchamia, zatrzymuje i zarządza cyklem życia kontenerów w systemie Linux (i też Windows). Containerd jest utrzymywany i rozwijany przez CNCF (Cloud Native Computing Foundation) jako niezależnym, neutralnym projektem open-source. Pełny Kubernetes i K3s używają containerd jako wspólnego standardu runtime.
Kiedy uruchamiasz Spark w środowisku Kubernetes lub zlecasz zadania, Spark musi łączyć się bezpośrednio z K3s API (zwróć na to uwagę jeśli np. wykorzystujesz Rancher do zarządzania klastrem). K3s trzyma certyfikaty lokalnie w /var/lib/rancher/k3s/server/tls/. Możesz to sprawdzić wykonując polecenie:
ls -l /var/lib/rancher/k3s/server/tls/
Po wylistowaniu zawartości tego katalogu zobaczysz certyfikaty:
server-ca.crt
server-ca.key
client-admin.crt
client-admin.key
...
To są prawdziwe certyfikaty K3s API. Certyfikat CA, którego potrzebujemy dla Sparka, to /var/lib/rancher/k3s/server/tls/server-ca.crt i jest on tożsamy z certyfikatem API
server-ca.crt jest potrzebny, żeby klient mógł zweryfikować, że łączy się z prawdziwym serwerem Kubernetes (kube-apiserver) i nawiązać bezpieczne połączenie TLS; nie daje on żadnych uprawnień w klastrze, służy tylko do zaufania certyfikatu serwera. „Czy łączę się naprawdę z moim serwerem K3s, a nie z kimś, kto się pod niego podszywa?”
Sprawdź adres API serwera cat /etc/rancher/k3s/k3s.yaml | grep server - ten adres będziesz używać w parametrze --master polecenia spark-submit. Zobaczysz tam prawdopodobnie lokalne IP które musisz w połączeniu podmienić realnym adresem servera Kubernetes.
Sprawdz czy certyfikat działa wykonując polecenie ('https://127.0.0.1:6443/version' to adres serwera który uzyskałeś powyższym poleceniem):
curl --cacert /var/lib/rancher/k3s/server/tls/server-ca.crt https://127.0.0.1:6443/version
Odpowiedzią będzie wyświetlenie JSON z wersją API i braku autoryzacji - to normalne.
Musisz teraz skopiować certyfikat CA z /var/lib/rancher/k3s/server/tls/server-ca.crt na swój PC (np poleceniem 'scp root@server:/var/lib/rancher/k3s/server/tls/server-ca.crt .' - po wykonania tego polecenia będziesz mieć server-ca.crt w bieżącym katalogu.).
Następnie sprawdź połączenie HTTPS do API używając tego certyfikatu (użyj do tego celu shell WSL który korzysta z OpenSSL, a nie Windows Schannel):
curl --cacert ./server-ca.crt https://10.0.0.120:6443/version
O ile serwer HTTPS działa i certyfikat jest poprawny, zwrócenie:
{
"status": "Failure",
"message": "Unauthorized",
"reason": "Unauthorized",
"code": 401
}
oznacza problem z autoryzacją, a nie z certyfikatem. Nasz certyfikat jest poprawny, możemy go używać do komunikacji z API Kubernetes.
Testujemy połączenie z autoryzacją
By to zrobić, musimy odczytać token autoryzacyjny z config kubectl, potrzebne będzie nam narzędzie Kubectl. To narzędzie wiersza poleceń do zarządzania klastrami Kubernetes, pozwalające tworzyć, aktualizować i usuwać zasoby oraz sprawdzać status klastra, komunikujące się z API serwera przy użyciu kubeconfig i autoryzacji. W środowisku WSL instalujemy je poleceniem 'sudo snap install kubectl --classic'.
Plik kubeconfig to plik konfiguracyjny używany przez narzędzie kubectl (oraz inne narzędzia Kubernetes) do łączenia się z klastrami Kubernetes. Zawiera on informacje potrzebne do uwierzytelnienia i komunikacji z API serwera klastra. W kubeconfig znajdują się cztery główne sekcje: clusters (adresy i certyfikaty klastrów), users (dane uwierzytelniające użytkowników, np. tokeny lub klucze), contexts (powiązania między klastrem, użytkownikiem i przestrzenią nazw) oraz current-context (określa, który kontekst jest aktualnie używany). Gdy wywołujesz komendę kubectl get pods, narzędzie odczytuje kubeconfig, wybiera bieżący kontekst, odnajduje odpowiedni klaster i użytkownika, a następnie łączy się z serwerem API przy użyciu zapisanych danych. Dzięki kubeconfig można łatwo przełączać się między wieloma klastrami, zarządzać różnymi kontami i przestrzeniami nazw, a także integrować logowanie z innymi narzędziami, takimi jak Helm czy systemy CI/CD.
Kubeconfig do komunikacji Kubectl pobierzesz z katalogu '/etc/rancher/k3s/k3s.yaml'. Wyświetl jego zawartość:
sudo cat /etc/rancher/k3s/k3s.yaml
Plik w środowisku WSL kopiujesz na lokalny PC poleceniem:
scp root@server:/etc/rancher/k3s/k3s.yaml ~/kubeconfig-k3s.yaml
Plik konfiguracyjny zawiera zapewne lokalny adres IP serwera Kubernetes; 127.0.0.1 - zmień go na taki który osiągany jest z Twojego PC - np: https://10.0.0.120:6443. Mając kubeconfig dodaj go do zmiennej środowiskowej by nie podawać lokalizacji pliku kofiguracyjnego jako parametr przy użyciu każdego polecenia Kubectl. Np:
export KUBECONFIG=/mnt/g/Kubectl/k3s.yaml
Przetestuj teraz połaczenie z klastrem Kubernetes poniższym poleceniem (służy ono do sprawdzenia podstawowych informacji o klastrze Kubernetes, czyli — z jakim serwerem API (control plane) łączy się kubectl i jakie główne usługi systemowe są dostępne w przestrzeni kube-system):
kubectl cluster-info
Konfiguracja RBAC
RBAC w Kubernetes to skrót od Role-Based Access Control, czyli kontrola dostępu oparta na rolach.
To mechanizm, który określa kto (użytkownik, konto serwisowe, aplikacja) może wykonywać jakie akcje na jakich zasobach w klastrze Kubernetes.
Domyślny service account ma bardzo ograniczone uprawnienia — nie może np.:
-
tworzyć executorów (czyli kolejnych podów),
-
odczytywać statusu podów,
-
zarządzać configmapami lub eventami.
Dlatego by Spark był w stanie zainicjalizować swoj schedulera i musimy utworzyć service account i rolę (RoleBinding), które dadzą Sparkowi uprawnienia w namespace 'spark' poleceniem kubectl:
kubectl apply -f - <<'EOF'
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-role
namespace: spark
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "services", "configmaps", "persistentvolumeclaims"]
verbs: ["get", "list", "create", "delete", "watch"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "create", "delete"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: spark
subjects:
- kind: ServiceAccount
name: spark
namespace: spark
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
EOF
Jeśli chcesz podejrzeć ustawienia tej roli z poziomu Rancher wejdź do menu Local (lub wybierz cluster, local)=>Project/namespaces=>Namespace 'spark'=>More resources=>RBAC=Roles. Znajdziesz tam właśnie utworzoną spark-role.
Uwaga, jeśli chcesz by pody executorów usuwały się automatycznie po wykonaniu zadania, dodja jeszcze uprawnienia '- deletecollection' do operacji na podach:
verbs:
- delete
- deletecollection
W Kubernetesie są dwa typy ról: Role i ClusterRole.
-
Role działa tylko w jednym namespace (tym, który jest wpisany w
metadata.namespace). Może nadawać uprawnienia do zasobów takich jakpods,configmaps,servicesitd. – ale tylko w tym konkretnym namespace. -
ClusterRole działa globalnie w całym klastrze. Może dawać dostęp do zasobów globalnych (np.
nodes,namespaces) lub do zasobów w wielu namespace’ach jednocześnie.
Aby przypisać te role do kont lub użytkowników:
-
RoleBinding łączy
Rolez użytkownikiem lub kontem serwisowym w tym samym namespace, -
ClusterRoleBinding łączy
ClusterRolez użytkownikiem lub kontem serwisowym w całym klastrze.
W naszym przypadku mamy role zdefiniowaną w namespace spark i RoleBinding, który ją przypisuje do konta serwisowego spark — więc ta rola obowiązuje tylko w namespace spark (i to jest poprawne oraz bezpieczne dla Sparka).
Wysyłamy zadanie Spark do klastra Kubernetes
Poleceniem spark-submit wykonanym na naszym lokalnym PC wysyłanie zadanie do Apache Spark. Kubernetes pobierze wskazane image Spark z zasobu Docker, utworzy pod drivera i odpowiednia ilość (określoną w parametrach polecenia spark-submit), uruchomione pody workerów wykonają nasze zadanie - w tym przypadku jedna aplikacja (czyli jeden plik JAR) zawiera wiele niezależnych przykładowych aplikacji (każdą w osobnej klasie). Uruchamiamy aplikację/klasę SparkPi:
spark-submit \
--master k8s://https://10.0.0.120:6443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=1 \
--conf spark.driver.memory=1g \
--conf spark.executor.memory=1g \
--conf spark.driver.cores=1 \
--conf spark.executor.cores=1 \
--conf spark.kubernetes.container.image=apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu \
local:///opt/spark/examples/jars/spark-examples_2.13-4.1.0-preview3.jar
Jeśli nasza aplikacja wykonała się poprawnie, pody executorów (tylko) potrzebne do wykonania zadania usuną się automatycznie jeśli wcześniej w uprawnieniach roli spark dałeś uprawnienia do '- deletecollection' (domyślnie spark-submit wysyła polecenie '--conf spark.kubernetes.executor.deleteOnTermination=true'). Pody executorów nie zostaną usunięte automatycznie i nie ma harmonogramu który je usunie w przyszłości jeśli wyślesz parametr '--conf spark.kubernetes.executor.deleteOnTermination=false'. Pody drivera pozostaną jednak nieusunięte (nie ma żadnego parametru by je usunąć). Jest to celowe - Spark je pozostawia w celu debugowania (logi, status, diagnostyka). Możesz w każdej chwili skasowanie pody recznie (jeśli chcesz sprawzić wcześniej jakie pody zostały utworzone przez nasze polecenie wykonaj kubectl get pods -n spark) lub ustawic cronJob w klastrze który będzie je usuwać co określony czas. Kasowanie podów ręcznie:
kubectl delete pods -n spark --field-selector=status.phase==Succeeded
Lub pody tylko drivera:
kubectl delete pods --field-selector=status.phase=Succeeded -l spark-role=driver -n spark
Polecenie z parametrem polecającym Kubernetes wykonanie zadania z brakiem usuwaniem podów executorów automatycznie po wykonaniu zadania (ten parametr jest domyślny, ustawiony na 'true'):
spark-submit \
--master k8s://https://10.0.0.120:6443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image=apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=1g \
--conf spark.driver.memory=1g \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.executor.deleteOnTermination=false \
local:///opt/spark/examples/jars/spark-examples_2.13-4.1.0-preview3.jar
Jeśli uruchomisz to samo zadanie po raz drugi, zobaczysz że wykona się znacznie szybciej. Dlaczego? Drugie uruchomienie Sparka było szybsze, ponieważ obraz Dockera apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu został już wcześniej pobrany i znajduje się w lokalnym cache na nodzie. Za pierwszym razem Kubernetes (a dokładniej kubelet) musiał sprawdzić, czy obraz jest dostępny lokalnie, a jeśli nie – pobrać go z Docker Hub, co trwa dłużej, bo obraz Sparka ma kilkaset megabajtów. Gdy obraz został już raz ściągnięty, kubelet go nie usuwa – pozostaje on zapisany na dysku w katalogu, w którym przechowywane są obrazy kontenerów (np. /var/lib/containerd/io.containerd.content.v1.content lub /var/lib/docker). Dzięki temu przy kolejnym uruchomieniu tego samego obrazu kontener startuje niemal natychmiast, bo nie trzeba nic pobierać z sieci. Możesz to potwierdzić, logując się na węzeł i wykonując polecenie crictl images, które pokazuje listę lokalnie zbuforowanych obrazów.