Jak działa Apache Spark w środowisku Kubernetes - uruchamianie zadań

W realnym, produkcyjnym scenariuszu użycia Spark w klastrze nie rezyduje on tam na stałe. Jego komponenty są instalowane dynamicznie wraz ze zlecaniem zadania i są usuwane po wykonaniu pracy.

Polecenie spark-submit jest używane do przesłania aplikacji (zadań) Spark do klastra Kubernetes. Wymagana ne jest byś miał(a) oprogramownie Spark lokalnie - twój lokalny Spark jest klientem.
Mechanizm przesyłania działa w następujący sposób:

  1. Spark tworzy proces drivera (Spark driver), który uruchamia się wewnątrz poda Kubernetes.

  2. Driver tworzy procesy executorów, które również działają jako pody w Kubernetes, łączy się z nimi i wykonuje kod aplikacji.

  3. Gdy aplikacja zakończy działanie, pody executorówzatrzymywane i usuwane, natomiast pod drivera zachowuje logi i pozostaje w stanie „completed” (zakończony) w API Kubernetes, dopóki nie zostanie usunięty ręcznie lub przez mechanizm automatycznego czyszczenia (garbage collection).

Należy zauważyć, że w stanie „completed” pod drivera nie zużywa żadnych zasobów obliczeniowych ani pamięci.

Planowaniem (ang. scheduling) podów drivera i executorów zajmuje się Kubernetes.
Komunikacja z API Kubernetes odbywa się przez bibliotekę fabric8.

Możliwe jest ograniczenie uruchamiania podów drivera i executorów do określonej grupy węzłów przy użyciu node selector — konfiguracji określającej, na których nodach mają się uruchamiać.
W przyszłych wersjach możliwe będzie korzystanie z bardziej zaawansowanych wskazówek dotyczących planowania, takich jak afinity węzłów lub podów (node/pod affinities).

Połączenie z Kubernetes

Poniższa instrukcja jest prawdziwa jeśli Twój używasz K3s a do zarządzania nim użyłeś instalacji Rancher. Jeśli masz więc Kubernetes K3s zainstalowany razem z Rancher, ma on swoją wbudowaną ścieżkę certyfikatów i uproszczony kube-apiserver.

K3s, jak i wersja K8, ma wbudowany containerd, więc nie używa Dockera. Containerd to lekki i niskopoziomowy runtime kontenerów — czyli program, który faktycznie uruchamia, zatrzymuje i zarządza cyklem życia kontenerów w systemie Linux (i też Windows). Containerd jest utrzymywany i rozwijany przez CNCF (Cloud Native Computing Foundation) jako niezależnym, neutralnym projektem open-source. Pełny Kubernetes i K3s używają containerd jako wspólnego standardu runtime.

Kiedy uruchamiasz Spark w środowisku Kubernetes lub zlecasz zadania, Spark musi łączyć się bezpośrednio z K3s API (zwróć na to uwagę jeśli np. wykorzystujesz Rancher do zarządzania klastrem). K3s trzyma certyfikaty  lokalnie w /var/lib/rancher/k3s/server/tls/. Możesz to sprawdzić wykonując polecenie:

ls -l /var/lib/rancher/k3s/server/tls/

Po wylistowaniu zawartości tego katalogu zobaczysz certyfikaty:

server-ca.crt
server-ca.key
client-admin.crt
client-admin.key
...

To są prawdziwe certyfikaty K3s API. Certyfikat CA, którego potrzebujemy dla Sparka, to /var/lib/rancher/k3s/server/tls/server-ca.crt i jest on tożsamy z certyfikatem API 

server-ca.crt jest potrzebny, żeby klient mógł zweryfikować, że łączy się z prawdziwym serwerem Kubernetes (kube-apiserver) i nawiązać bezpieczne połączenie TLS; nie daje on żadnych uprawnień w klastrze, służy tylko do zaufania certyfikatu serwera. „Czy łączę się naprawdę z moim serwerem K3s, a nie z kimś, kto się pod niego podszywa?”

Sprawdź adres API serwera cat /etc/rancher/k3s/k3s.yaml | grep server - ten adres będziesz używać w parametrze --master polecenia spark-submit. Zobaczysz tam prawdopodobnie lokalne IP które musisz w połączeniu podmienić realnym adresem servera Kubernetes.

Sprawdz czy certyfikat działa wykonując polecenie ('https://127.0.0.1:6443/version' to adres serwera który uzyskałeś powyższym poleceniem):

curl --cacert /var/lib/rancher/k3s/server/tls/server-ca.crt https://127.0.0.1:6443/version

Odpowiedzią będzie wyświetlenie JSON z wersją API i braku autoryzacji - to normalne.

Musisz teraz skopiować certyfikat CA z /var/lib/rancher/k3s/server/tls/server-ca.crt na swój PC (np poleceniem 'scp root@server:/var/lib/rancher/k3s/server/tls/server-ca.crt .' - po wykonania tego polecenia będziesz mieć server-ca.crt w bieżącym katalogu.).

Następnie sprawdź połączenie HTTPS do API używając tego certyfikatu (użyj do tego celu shell WSL który korzysta z OpenSSL, a nie Windows Schannel):

curl --cacert ./server-ca.crt https://10.0.0.120:6443/version

O ile serwer HTTPS działa i certyfikat jest poprawny, zwrócenie:

{
  "status": "Failure",
  "message": "Unauthorized",
  "reason": "Unauthorized",
  "code": 401
}

oznacza problem z autoryzacją, a nie z certyfikatem. Nasz certyfikat jest poprawny, możemy go używać do komunikacji z API Kubernetes.

Testujemy połączenie z autoryzacją

By to zrobić, musimy odczytać token autoryzacyjny z config kubectl, potrzebne będzie nam narzędzie Kubectl. To narzędzie wiersza poleceń do zarządzania klastrami Kubernetes, pozwalające tworzyć, aktualizować i usuwać zasoby oraz sprawdzać status klastra, komunikujące się z API serwera przy użyciu kubeconfig i autoryzacji. W środowisku WSL instalujemy je poleceniem 'sudo snap install kubectl --classic'.

Plik kubeconfig to plik konfiguracyjny używany przez narzędzie kubectl (oraz inne narzędzia Kubernetes) do łączenia się z klastrami Kubernetes. Zawiera on informacje potrzebne do uwierzytelnienia i komunikacji z API serwera klastra. W kubeconfig znajdują się cztery główne sekcje: clusters (adresy i certyfikaty klastrów), users (dane uwierzytelniające użytkowników, np. tokeny lub klucze), contexts (powiązania między klastrem, użytkownikiem i przestrzenią nazw) oraz current-context (określa, który kontekst jest aktualnie używany). Gdy wywołujesz komendę kubectl get pods, narzędzie odczytuje kubeconfig, wybiera bieżący kontekst, odnajduje odpowiedni klaster i użytkownika, a następnie łączy się z serwerem API przy użyciu zapisanych danych. Dzięki kubeconfig można łatwo przełączać się między wieloma klastrami, zarządzać różnymi kontami i przestrzeniami nazw, a także integrować logowanie z innymi narzędziami, takimi jak Helm czy systemy CI/CD.

Kubeconfig do komunikacji Kubectl pobierzesz z katalogu '/etc/rancher/k3s/k3s.yaml'. Wyświetl jego zawartość:

sudo cat /etc/rancher/k3s/k3s.yaml

Plik w środowisku WSL kopiujesz na lokalny PC poleceniem:

scp root@server:/etc/rancher/k3s/k3s.yaml ~/kubeconfig-k3s.yaml

Plik konfiguracyjny zawiera zapewne lokalny adres IP serwera Kubernetes; 127.0.0.1 - zmień go na taki który osiągany jest z Twojego PC - np: https://10.0.0.120:6443. Mając kubeconfig dodaj go do zmiennej środowiskowej by nie podawać lokalizacji pliku kofiguracyjnego jako parametr przy użyciu każdego polecenia Kubectl. Np:

export KUBECONFIG=/mnt/g/Kubectl/k3s.yaml

Przetestuj teraz połaczenie z klastrem Kubernetes poniższym poleceniem (służy ono do sprawdzenia podstawowych informacji o klastrze Kubernetes, czyli — z jakim serwerem API (control plane) łączy się kubectl i jakie główne usługi systemowe są dostępne w przestrzeni kube-system):

kubectl cluster-info

Konfiguracja RBAC

RBAC w Kubernetes to skrót od Role-Based Access Control, czyli kontrola dostępu oparta na rolach.
To mechanizm, który określa kto (użytkownik, konto serwisowe, aplikacja) może wykonywać jakie akcje na jakich zasobach w klastrze Kubernetes.

Domyślny service account ma bardzo ograniczone uprawnienia — nie może np.:

  • tworzyć executorów (czyli kolejnych podów),

  • odczytywać statusu podów,

  • zarządzać configmapami lub eventami.

Dlatego by Spark był w stanie zainicjalizować swoj schedulera i musimy utworzyć service account i rolę (RoleBinding), które dadzą Sparkowi uprawnienia w namespace 'spark' poleceniem kubectl:

kubectl apply -f - <<'EOF'
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark
  namespace: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: spark-role
  namespace: spark
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "services", "configmaps", "persistentvolumeclaims"]
    verbs: ["get", "list", "create", "delete", "watch"]
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs"]
    verbs: ["get", "list", "create", "delete"]
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: spark
subjects:
  - kind: ServiceAccount
    name: spark
    namespace: spark
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io
EOF

Jeśli chcesz podejrzeć ustawienia tej roli z poziomu Rancher wejdź do menu Local (lub wybierz cluster, local)=>Project/namespaces=>Namespace 'spark'=>More resources=>RBAC=Roles. Znajdziesz tam właśnie utworzoną spark-role.

Uwaga, jeśli chcesz by pody executorów usuwały się automatycznie po wykonaniu zadania, dodja jeszcze uprawnienia '- deletecollection' do operacji na podach:

verbs:
  - delete
  - deletecollection

W Kubernetesie są dwa typy ról: Role i ClusterRole.

  • Role działa tylko w jednym namespace (tym, który jest wpisany w metadata.namespace). Może nadawać uprawnienia do zasobów takich jak pods, configmaps, services itd. – ale tylko w tym konkretnym namespace.

  • ClusterRole działa globalnie w całym klastrze. Może dawać dostęp do zasobów globalnych (np. nodes, namespaces) lub do zasobów w wielu namespace’ach jednocześnie.

Aby przypisać te role do kont lub użytkowników:

  • RoleBinding łączy Role z użytkownikiem lub kontem serwisowym w tym samym namespace,

  • ClusterRoleBinding łączy ClusterRole z użytkownikiem lub kontem serwisowym w całym klastrze.

W naszym przypadku mamy role zdefiniowaną w namespace spark i RoleBinding, który ją przypisuje do konta serwisowego spark — więc ta rola obowiązuje tylko w namespace spark (i to jest poprawne oraz bezpieczne dla Sparka).

Wysyłamy zadanie Spark do klastra Kubernetes

Poleceniem spark-submit wykonanym na naszym lokalnym PC wysyłanie zadanie do Apache Spark. Kubernetes pobierze wskazane image Spark z zasobu Docker, utworzy pod drivera i odpowiednia ilość (określoną w parametrach polecenia spark-submit), uruchomione pody workerów wykonają nasze zadanie - w tym przypadku jedna aplikacja (czyli jeden plik JAR) zawiera wiele niezależnych przykładowych aplikacji (każdą w osobnej klasie). Uruchamiamy aplikację/klasę SparkPi:

spark-submit \
  --master k8s://https://10.0.0.120:6443 \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.kubernetes.namespace=spark \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.executor.instances=1 \
  --conf spark.driver.memory=1g \
  --conf spark.executor.memory=1g \
  --conf spark.driver.cores=1 \
  --conf spark.executor.cores=1 \
  --conf spark.kubernetes.container.image=apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu \
  local:///opt/spark/examples/jars/spark-examples_2.13-4.1.0-preview3.jar

Jeśli nasza aplikacja wykonała się poprawnie, pody executorów (tylko) potrzebne do wykonania zadania usuną się automatycznie jeśli wcześniej w uprawnieniach roli spark dałeś uprawnienia do '- deletecollection' (domyślnie spark-submit wysyła polecenie '--conf spark.kubernetes.executor.deleteOnTermination=true'). Pody executorów nie zostaną usunięte automatycznie i nie ma harmonogramu który je usunie w przyszłości jeśli wyślesz parametr '--conf spark.kubernetes.executor.deleteOnTermination=false'. Pody drivera pozostaną jednak nieusunięte (nie ma żadnego parametru by je usunąć). Jest to celowe - Spark je pozostawia w celu debugowania (logi, status, diagnostyka). Możesz w każdej chwili skasowanie pody recznie (jeśli chcesz sprawzić wcześniej jakie pody zostały utworzone przez nasze polecenie wykonaj kubectl get pods -n spark) lub ustawic cronJob w klastrze który będzie je usuwać co określony czas. Kasowanie podów ręcznie:

kubectl delete pods -n spark --field-selector=status.phase==Succeeded

Lub pody tylko drivera:

kubectl delete pods --field-selector=status.phase=Succeeded -l spark-role=driver -n spark

Polecenie z parametrem polecającym Kubernetes wykonanie zadania z brakiem usuwaniem podów executorów automatycznie po wykonaniu zadania (ten parametr jest domyślny, ustawiony na 'true'):

spark-submit \
  --master k8s://https://10.0.0.120:6443 \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.kubernetes.namespace=spark \
  --conf spark.kubernetes.container.image=apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu \
  --conf spark.executor.instances=1 \
  --conf spark.executor.memory=1g \
  --conf spark.driver.memory=1g \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
 --conf spark.kubernetes.executor.deleteOnTermination=false \
  local:///opt/spark/examples/jars/spark-examples_2.13-4.1.0-preview3.jar

Jeśli uruchomisz to samo zadanie po raz drugi, zobaczysz że wykona się znacznie szybciej. Dlaczego? Drugie uruchomienie Sparka było szybsze, ponieważ obraz Dockera apache/spark:4.1.0-preview3-scala2.13-java17-python3-ubuntu został już wcześniej pobrany i znajduje się w lokalnym cache na nodzie. Za pierwszym razem Kubernetes (a dokładniej kubelet) musiał sprawdzić, czy obraz jest dostępny lokalnie, a jeśli nie – pobrać go z Docker Hub, co trwa dłużej, bo obraz Sparka ma kilkaset megabajtów. Gdy obraz został już raz ściągnięty, kubelet go nie usuwa – pozostaje on zapisany na dysku w katalogu, w którym przechowywane są obrazy kontenerów (np. /var/lib/containerd/io.containerd.content.v1.content lub /var/lib/docker). Dzięki temu przy kolejnym uruchomieniu tego samego obrazu kontener startuje niemal natychmiast, bo nie trzeba nic pobierać z sieci. Możesz to potwierdzić, logując się na węzeł i wykonując polecenie crictl images, które pokazuje listę lokalnie zbuforowanych obrazów.