Apache Flink - przetwarzanie danych

Domyślnym sposobem tworzenia aplikacji Apache Flink jest użycie Flink SQL. Ułatwia on tworzenie aplikacji przetwarzania strumieniowego przy użyciu standardowego SQL. Łatwo jest nauczyć się Flink, jeśli kiedykolwiek pracowałeś z bazą danych lub systemem podobnym do SQL, zgodnym z ANSI-SQL 2011.

Podobnie jak w przypadku wszystkich silników SQL, zapytania Flink działają na szczycie tabel. Różnią się one jednak od tradycyjnej bazy danych, ponieważ Flink nie zarządza danymi składowanymi w statycznych tabelach; zamiast tego jego zapytania działają w sposób ciągły na danych zewnętrznych (z tabel baz danych lub plików powstają tabele dynamiczne w programie Flink a po przetworzeniu danych mogą być z powrotem zapisane do tradycyjnych tabel baz danych i do plików).

Potoki przetwarzania danych Flink zaczynają się od danych źródłowych. Dane źródłowe tworzą wiersze obsługiwane podczas wykonywania zapytania; są to dynamiczne tabele Flink, do których odwołuje się klauzula FROM zapytania. Mogą to być topiki Kafki, bazy danych, systemy plików lub dowolny inny system z którego Flink wie jak korzystać.

Zanim zaczniesz czytać dalej, upewnij się że przeczytałe(a)ś wcześniejsze artykuły jak zainstalować Apache Flink [uruchomienie-apache-flink-w-windows] i jak utworzyć pierwszą aplikację w tym środowisku [pierwsza-transformacja-w-apache-flink]

Raz jeszcze: tabele dynamiczne Flink nie są tabelami baz danych - powstają one poprzez odczytanie tabel baz danych [np MySQL, Oracle], plików [np. CSV] lub pochodzą z innych źródeł [np. Kafka]. Flink używa swoich własnych tabel - przeczytaj o nich więcej na stronie Apache Flink

Treść tego artykułu

Poniżej znajdziesz opis podstaw korzystania z Flink SQL API - niezbędne informacje by tworzyć programy w Apache Flink. Dowiesz się jak:

  • Połączyć się z bazą danych
  • Odczytać tabelę bazy danych
  • Przetworzyć dane
  • Zapisać dane do bazy danych

Połączenie Flink z bazą danych

Utwórz nowy projekt Maven lub Java w Eclipse [zobacz wcześniejszy artykuł jak stworzyć projekt w Flink w Eclipse]. Flink potrzebuje zależności MAVEN by korzystać z protokołu JDBC. Jeśli Twoim projektem jest projekt Java, przekształć go w projekt Maven. Prawym przyciskiem kliknij na nazwie projektu i z menu wybierz "Configure => Convert to Maven Project"

Wartości w oknie "Create new POM" pozostaw puste i kliknij na "Finish". W efekcie tego w katalogu projektu pojawi sie plik "pom.xml". Plik ten ma postać:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>flink-jdbc</groupId>
  <artifactId>flink-jdbc</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <release>11</release>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

W tym pliku musisz dodać zależność Maven, "flink-connector-jdbc" [wg. instrukcji dokumentacji Flink] wklejając poniższy fragment w strukturę pliku "pom.xml"

Przed tagiem zamykajacym </project> umieść tag <dependencies> i zamknij go </dependencies>. Wystarczy ze rozpoczniesz pisać "<de" a Eclipse podpowie Ci składnię. Twój "pom.xml" powinien teraz wyglądać tak:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>flink-jdbc</groupId>
  <artifactId>flink-jdbc</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <release>11</release>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.14.5</version>
    </dependency>
  </dependencies>
</project>

Następnie zaktualizuj projekt klikając prawym przyciskiem myszy na projekt i wybierając "Maven => Update project" [w oknie które się pojawi powinien być domyślnie wybrany Twoj projekt - kliknij "OK"].

Dodaj sterownik JDBC

Instalacja Flink nie posiada domyślnie sterowników JDBC do baz danych. Musisz je pobrać ze stron producentów baz danych. Wejdź na stronę Flink i pobierz odpowiedni plik a następnie umieść go w katalogu "lib". Dla bazy MySQL konektorem JDBC jest plik "mysql-connector-java-8.0.30.jar" ze strony https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.30/.

Następnie dodaj plik sterownika do "classpath" projektu. Prawym przyciskiem myszy kliknij na katalog "src": "Build path => Configure build path". Kliknij na "Classpath" w zakładce "Libraries", kliknij na "Add Externaj JARs..." i dodaj właśnie pobrany plik. Kliknij "Apply and close". Gotowe.

Odczytujemy tabele z bazy danych MySQL

Nasza testowa tabela "clients" znajduje sie w bazie danych o nazwie "test" i ma strukturę:

Oto kod klasy programu który łączy się z bazą MySQL, pobiera dane z tabeli i tworzy tabelę dynamiczną Flink a następnie wyświetla w konsoli zawartość tej tabeli:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkJDBC {

public static void main(String[] args) throws Exception {

        //Definiuje wszystkie parametry, które inicjują środowisko tabeli.
        //Te parametry są używane tylko podczas tworzenia inicjalizacji TableEnvironment i nie można ich później zmienić.
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);

        //Laczymy sie z MySQL i pobieramy dane z tabeli 'clients' do tabeli Flink 'MyClients'
        tEnv.executeSql("CREATE TABLE MyClients ("
                 + " customer_number INT, "
                 + " vat_no INT, "
                 + " name STRING, "
                 + " PRIMARY KEY (customer_number) NOT ENFORCED"
                 + " ) WITH ( "
                 + " 'connector' = 'jdbc', "
                 + " 'url' = 'jdbc:mysql://localhost:3306/test', "
               + " 'username' = 'user_name', "
                 + " 'table-name' = 'clients', "
                 + " 'password' = ''"
                +  ")");

        //Testowy select
        tEnv.sqlQuery("SELECT * FROM MyClients")
        .execute()
        .print();

     }
}

Rezultat który ujrzymy w konsoli Eclipse IDE po kliknięciu na "Run => Run":

+-----------------+-------------+--------------------------------+
| customer_number |      vat_no |                           name |
+-----------------+-------------+--------------------------------+
|               1 |          11 |                        client1 |
|               2 |          22 |                        client2 |
+-----------------+-------------+--------------------------------+
2 rows in set

Przetwarzamy dane

Petla przez wszystkie rekordy tabeli:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;

public class FlinkJDBC {

public static void main(String[] args) throws Exception {

        //Definiuje wszystkie parametry, które inicjują środowisko tabeli.
        //Te parametry są używane tylko podczas tworzenia inicjalizacji TableEnvironment i nie można ich później zmienić.
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);

        //Laczymy sie z MySQL i pobieramy dane z tabeli 'clients' do tabeli Flink 'MyClients'
        tEnv.executeSql("CREATE TABLE MyClients ("
                 + " customer_number INT, "
                 + " vat_no INT, "
                 + " name STRING, "
                 + " PRIMARY KEY (customer_number) NOT ENFORCED"
                 + " ) WITH ( "
                 + " 'connector' = 'jdbc', "
                 + " 'url' = 'jdbc:mysql://localhost:3306/test', "
                 + " 'username' = 'root', "
                 + " 'table-name' = 'clients', "
                 + " 'password' = ''"
                +  ")");

        
        //Petla przez rekordy tabeli
        TableResult tableResult1 = tEnv.executeSql("SELECT * FROM MyClients");
        try (org.apache.flink.util.CloseableIterator<Row> it = tableResult1.collect()) {
            while(it.hasNext()) {
                Row row = it.next();
              //Przetwarzanie rekordu (tutaj prosty print do konsoli):
                String rekord = row.getField("name").toString();
                System.out.println(rekord);
            }
        }

     }
}

Zapisanie danych do tabeli MySQL

Zapisać dane do tabeli utworzonej z połączena JDBC możemy na dwa sposoby. Pierwszy, naturalny, to pobranie rekordów z innej tabeli dynamicznej Flink. Wykonujemy select i wklejamy rezultat do tabeli JDBC:

tEnv.executeSql("INSERT INTO MyClients SELECT customer_number, vat_no, name FROM csv_table");

W powyższym przykładzie "cs_table" jest linkiem do pliku CSV. Pamiętaj że we Flink tabele są zawsze linkiem do danych z jakiegoś źródła. Nie można utworzyć we Flink pustej tabeli. Obejściem tych ograniczeń może być użycie Table Store [więcej tutaj] lub Hive Dialect [więcej tutaj].

Drugim sposobem jest wklejenie wartości komendą SQL. Np. przetwarzasz dane i chcesz utworzyć zupełnie nowy rekord w tabeli:

tEnv.executeSql("INSERT INTO MyClients (customer_number, vat_no, name) VALUES (1000, 23, 'client3')");

Czytając wszystkie trzy artykuły [uruchomienie Flink, pierwsza transformacja w Apache Flink, Flink - przetwarzanie danych] szybko zdobędziesz podstawową wiedzę jak korzystać z tego środowiska.