Integracja Pentaho Kettle z Elasticsearch

W tym artykule dowiesz się jak pobrać dane z Elasticsearch z poziomu Pentaho Kettle. Elasticsearch jest bazą typu noSQL która oferuje wysoką prędkość odczytu nieustrukturyzowanych danych. Dostęp do bazy Elasticsearch jest nieco inny niż do baz SQL. Jeśli zatem masz zbiory danych w tym formacie i chcesz się do nich podłączyć z poziomu Pentaho, zobacz jak możesz to uzyskać.

Tworzenie zapytania

Pierwszym krokiem jest stowrzenie zapytania do Elasticsearch (ES). Zapytanie napiszemy w języku ES - Query DSL (Domain Specific Language) bazującym na formacie JSON. Więcej o tym języku dowiesz się na stronie Elastic. Nasze zapytanie mogłoby zawierać kilka parametrów i praktycznie ograniczyć się do kilku linijek tekstu. W realnym środowisku dane do zapytania będą często dynamiczne i zawierać dziesiątki ... może nawet tysiące.... parametrów. Na przykład listę artykułów dla których informacji szukamy - w tym przykładzie pobieramy z ES dane wg konkretnej listy artykułów wcześniej pobranych z bazy SQL. Nasza transformacja/zapytanie będzie wyglądać zatem tak:

Pobierz dane z tabeli => Ustaw parametry zapytania do ES (Query DSL) => Połącz się z REST Elastic => Odczytaj dane

Pobierz dane z tabeli

Pierwszy krok to użycie komponentu "Table input". W naszym przykładzie korzystamy z bazy MySQL. Składnia tego zapytania wygląda następująco:

SELECT GROUP_CONCAT('', bdata, '' SEPARATOR ',') AS formated_parameters FROM
(
  SELECT
    CONCAT(product_no) AS bdata
  FROM articles
)AS rdata

Zapytanie to zwróci nam listę artykułów w jednym rekordzie odseparowanych przecinkami. Dokladnie takiego formatu potrzebujemy dla nastepnego kroku -  tworzenia zapytania w formacie Query DSL.

80923356,80911155,80888766,80994777,80123788,80006945,82347219,80765221,...

Ustaw parametry zapytania

Potrzebujemy utworzyć zapytanie w języku Elastic - Query DSL. Wykorzystamy do tego krok "Modified JavaScript value".

var size = 1000;
var query;
var parameters = formated_parameters;

query = '{"size": '+size+', "query": {"bool": {"must": {"terms" : { "article_no" : ['+parameters+']}},"filter": {"range": {"order_date": {"gte": "2018-12-01","lte": "2019-01-31","format": "yyyy-MM-dd", "time_zone": "+01:00"}}}}}}';

Krok ten ma zwracać w rezultacie zmienną "query" w postaci String która zawiera gotowe zapytanie do wysłania do ES. "Parameters" w kodzie to rezultat zapytania, w poprzednim kroku, do bazy danych MySQL - lista artykułów.  Parametr "size" pozwala określić ile linii zwróci w odpowiedzi (jedna strona odpowiedzi).

Połącz się z REST Elasticsearch

W tym kroku użyjemy "REST Client" by połączyć się z REST Elasticsearch. Nasz testowy ES nasłuchuje na porcie 8080:

Parametr "scroll" zapytania w URL'u określa timeout - jak długo zapytanie pozostanie aktywne. Dlaczego czasami potrzebujemy dodać ten parametr? Otrzymanie dużej ilości danych z ES wymaga często czasu. Gdybyśmy nie ustawili tego parametru, zapytanie mogłoby wygasnąć.

Odczytujemy dane

ES używa "stronicowania" w rezultacie zapytania. Oznacza to że przy dużej ilości danych rezultat zostanie podzielony na strony. Pierwsza odpowiedź zawiera pierwszą stronę danych i "_scroll_id" czyli wskaźnik do następnej strony odpowiedzi. Odczyt danych z ES polaga na parsowaniu kolejnych stron które są w formacie JSON. By sparsować odpowiedź użyjemy kroku "Modified Java Script Value". Skrypt będzie zawierać linie:

var obj = JSON.parse(result);
var ile = obj.hits.hits.length;
var scroll_id = obj._scroll_id;

"Obj" to rezultat poprzedniego kroku (REST) w formacie JSON. Zmienna "ile" zawiera informację o ilości rekordów. Jeśli ma wartość wyższą od zera, odpowiedź nie jest pusta. "_scroll_id" to następna strona do odczytu. "scroll_id" i "ile" przekazujemy do nastepnego kroku - parsowania wybranych pol z odpowiedzi ES.

Parsowanie odpowiedzi ES

Następnym krokiem jest JSON Input. Wybieramy z JSON który zwrócił ES pola które nas interesują. Np:

$.hits.hits.*_source.sold_to_no
$.hits.hits.*_source.order_date

Zapisujemy wybrane rekordy i ustawiamy zmienne

Dane z pierwszej strony zapisujemy do pliku (jeśli docelowym formatem ma być plik) - w kroku "Text file output". Jednocześnie ustawiamy zmienne "ile" i "scroll_id" w kroku "Set Variables". Zmienne te będą potrzebne nam do pętli przez wszystkie strony rezultatu ES - patrz dalej. Oto jak wygląda cała transformacja:

Tworzenie transformacji odczytujacej następne strony odpowiedzi ES

W transformacji Pentaho która stworzyliśmy powyżej wysłaliśmy zapytanie do ES i odczytaliśmy tylko pierwszą stronę odpowiedzi. Jeśli odpowiedź zawierała więcej dokumentów/danych niż mogła pomieścić jedna strona, musimy odczytać pozostałe. Transformacja będzie składać się z kroków:

Pobierz zmienne => Ustaw parametry => Połącz się z REST => Odczytaj dane => Parsuj rezultat => Zapisz dane i ustaw zmienne

Odczyt zmiennych

Poprzednią transformację kończyliśmy krokiem "Set variables". Jeśli rezultat ES zawiera więcej niż dwie strony, musimy teraz odczytać te zmienne i wykonać podobne zapytanie. Tym razem głownym parametrem dla tego zapytania będzie "scroll_id". Odczytamy tę zmienną krokiem "Get variables" (variable: ${scroll_id})

Ustaw parametry zapytania

Używając komponentu "Modified JavaScript value" tworzymy zapytanie. Jego kod to:

var query;
query = '{"scroll" : "3m","scroll_id" : "'+scroll_id+'"}';

Połącz się z REST Elasticsearch

W tym kroku użyjemy "REST Client" by połączyć się z REST Elasticsearch. Tym razem URL wyglądać będzie nieco inaczej:

http://elastic:8080/_search/scroll

Odczytujemy dane

Identycznie jak poprzednio, parsujemy odpowiedź ES:

var obj = JSON.parse(result);
var ile = obj.hits.hits.length;
var scroll_id = obj._scroll_id;

Zapisujemy wybrane rekordy i ustawiamy zmienne

Te kroki są identyczne jak w pierwszej transformacji.

Łączymy transformacje w zadanie

Finalnie potrzebujemy zadania by połączyć wszystko w całość i utworzyć pętlę dla pobrania więcej niż jednej strony z ES. Zadanie będzie wyglądać następująco:

Transformacja "Wyślij zapytanie" inicjuje pobieranie danych - tutaj wysyłamy zapytanie z kryteriami wyszukiwania danych w ES. "Zapytanie zwróciło dane?" sprawdza czy zmienna "ile" jest wyższa niż zero. Jeśli nie, cała odpowiedź zmieściła się na jednej stronie odpowiedzi - mamy to zapisane w pliku (rezultat pierwszej transformacji). Jeśli tak, pobierz następną/następne strony. Transformacja "Pobierz dane" pobiera właśnie następną stronę. Po wykonaniu tej transformacji następuje sprawdzenie czy była to ostania strona - "Koniec scroll?". W tym kroku następuje sprawdzenie zmiennej "ile". Jeśli jest wyższa niż zero, wróć do transforamcji raz jeszcze. W ten oto sposób możemy pobrać dane z ES i zapisać je do inne formatu lub użyć do analiz "w locie".