Zapraszam do kolejnej odsłony „Wiosennych porządków z AWS Glue”, gdzie skoncentrujemy się na ETL części usługi AWS Glue.
Krótkie przypomnienie
W poprzednim artykule wykonaliśmy operacje, które pomogły nam zapełnić „Data Catalog” metadanymi o naszych źródłach danych.
Do tego celu wykorzystaliśmy biblioteki JDBC zintegrowane z platformą by zdefiniować połączenia w ramach menu „Connections” i użyliśmy procesu zwanego „Crawler” by zbadać nasze źródła używając wcześniej zdefiniowanych „Connections” (nie wymagane dla S3) . Te procedury były niezbędne by móc teraz poddać nasze dane transformacji i przygotować je pod końcowe przetwarzanie.
W tej chwili, mój katalog posiada informacje z dwóch źródeł:
- przykładowe dane które pobrałem z NYC.GOV w formie CSV, zapisane na S3 (baza danych „nyc”)
- przykładowa baza danych którą pobrałem z Github.com/Microsoft i przywróciłem poprzez „restore” na platformie EC2 Windows SQL 2017 (baza danych „sales”).
Plan jest taki
Metadane o naszych źródłach są potrzebne by dokonać transformacji ich schematu jak i też by zlokalizować same źródła. Oba skatalogowane obiekty wykorzystamy, więc do operacji ETL, które za chwilę skonfigurujemy. Naszym celem jest przygotowanie danych w taki sposób, byśmy mogli do ich analityki OLAP wykorzystać usługę Amazon Redshift.
Amazon Redshift obecnie wspiera źródła tj.:
- S3
- HDFS
- DynamoDB
- kopia bezpośrednio z hosta używając SSH
Myślę, że wybór jest prosty. Stworzymy nową tabelę w oparciu o dane z obu istniejące źródeł danych na S3.
Mój pierwszy job
Z menu „ETL” wybieramy Jobs.
To Job’y są odpowiedzialne za logikę ETL (extract, transform, load) na naszych źródłach. Raz skonfigurowane procedury mogą być odpalane na żądanie, wg schedulera albo wywoływane przez eventy tak by tworzyć pipeline’y przetwarzania (więcej o tym w następnym artykule). Procedura wykona operację na jednym albo wielu źródłach jednocześnie i zapiszę output w wskazanej lokalizacji (w tym artykule jest to MS SQL). Kod który zostanie odpalony przez AWS Glue na naszych źródłach może być napisany w Scala albo PySpark (Python Api dla Apache Spark).
Przykładowe skrypty w obu językach możecie znaleźć tutaj: https://github.com/aws-samples/aws-glue-samples.
Klikamy w Add job by skonfigurować naszą pierwszą procedurę.
Wprowadzamy nazwę dla naszego Job’a. IAM Rolę, która umożliwia zapis i odczyt z S3 bucket (będzie potrzebne na tymczasowe dane i zapisanie pliku końcowego). Wybieramy formę, w jakiej przedstawimy nasz skrypt AWS Glue:
- A proposed script generated by AWS Glue (odpala graficznego wizard’a z przykładowym kodem)
- An existing script that you provide (skrypt w Scala albo Python zapisany na S3 bucket, który należy wskazać)
- A new script to be authored by (odpala graficznego wizard’a w który możecie napisać swój kod)
Wybieramy opcję pierwszą i jako „ETL language” wskazujemy Python. Poniżej wskazujemy lokalizację na S3, gdzie końcowy kod ma być zapisany, oraz wskazujemy dodatkową lokalizację na S3 która będzie użyta do zapisu plików tymczasowych podczas wykonywania Job’a.
Po kliknięciu w ustawienia zaawansowane, możemy wskazać czy użyjemy Job Bookmark.
Job Bookmark pozwala nam na zaznaczenie, gdzie ma być kontynuowany Job przy kolejnej iteracji:
- Disable (Job nie używa checkpoint’ów).
- Enable (Job używa checkpoint’ów do oznaczania na których danych zakończył przetwarzanie, ponowne wywołanie będzie kontynuowane od nowych danych umieszczonych za checkpoint’em).
- Pause (Job wykorzystuje ostatni checkpoint by określić, od których danych ma zacząć. Nie przesuwa jednak checkpoint’u na koniec danych, zatem checkpoint po nowym wywołaniu zostaje w poprzednim miejscu).
Wybieram opcję Disable.
Dodatkowo, możemy wzbogacić nasz Job o zewnętrzne biblioteki Python i archiwa JAR. Określić ile jednostek DPU (1 DPU = 4 vCPU and 16 GB) ma być przypisane do zadania, jak wiele wystąpień tego zadania pozwalamy odpalić na raz i po jakim czasie procesowania zadania ma się zakończyć („timeout”). Jeśli Job zakończy się niepomyślnie ile razy AWS Glue ma próbować powtórzyć te same zadanie nim całkowicie się podda.
Dzięki integracji z Cloudwatch i SNS możemy również skonfigurować notyfikacje w przypadku, gdy procesowanie naszego Job’a przekroczyło ustalony czas („Delay notification threshold (minutes)”).
Dla naszego testu zmieniłem Concurrent DPUs per Job run na 2 – domyślne ustawienie to 10.
W ramach „Job properties” możemy również otagować nasz Job. (np. w celach bilingowych)
Klikamy w Next.
Okienko które się następnie otworzy służy do wskazywania źródeł do naszego przetwarzania („Choose your data sources”). Powinniśmy tu widzieć wszystkie tabele, które zostały skatalogowane i zapisane w Data Catalog.
Wybieram plik CSV (źródło bazy danych „nyc”) zapisany na S3.
W okienku „Choose your data targets” wybieramy cel dla naszej procedury. Ja chciałbym stworzyć nową tabelę w mojej bazie MS SQL MyDB (w Data Catalog widoczna jako „sales”). Dostępne tutaj opcje to:
- Create tables in your data target
- Use tables in the data catalog and update your data target
Wybieram opcję pierwszą.
Tabelka pozwala na wybranie parametrów, które wcześniej zostały zdefiniowane w ramach „Data Catalog”. Po swojej stronie muszę tylko podać nazwę bazy danych w „Database name” i klikam Next
Wizard przenosi nas do okienka, w którym możemy podejrzeć schemat proponowany przez AWS Glue do zastosowania na nowej tabeli w mojej docelowej bazie. Już tutaj możemy zauważyć, że np. zamiast używania „bigint” proponuje typ „long”.
Na widocznej tabeli mamy również możliwość zmiany nazw kolumn, typu danych, możemy dodać nowe kolumny i zmienić mapowania.
Z mojego pliku CSV interesują mnie tylko dane finansowe, więc usuwam wszystkie zbędne kolumny, użyje do tego „X”.
Zmieniłem też typ danych kolumny „total_amount” z „Double” na „String”, oraz dodałem kolumnę „Id”.
W przypadku popełnienia błędu, mogę zresetować cały widok do domyślnego klikając w Reset, ja natomiast klikam w Next i Save Job and edit script.
Następne okienko to już graficzny interfejs z poziomu którego możemy podejrzeć proponowany kod albo napisać swój własny kod.
Po lewej stronie widzimy workflow naszej operacji ETL oparty o sekcje kodu oznaczone jako ## @type:
Jest pięć typów sekcji, z których jest tworzony design (możemy je wstawić w kod manualnie, bądź klikając przyciski akcji w prawym górnym rogu):
- Source (źródłowa tabela, dodaje źródło)
- Target (tabela docelowa z pośród tych, które mamy skatalogowane)
- Target Location (tabela docelowa, wcześniej nieskatalogowana)
- Transform (operacja ETL, lista obecnie dostępnych operacji ETL dostępna tutaj: https://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html)
- Spigot (zrzut danych z DynamicFrame na S3, o DynamicFrame za chwilę)
Nasz kod rozpoczyna się od załadowania modułów (tutaj możemy też zaimportować paczki wcześniej zapisane na S3 ) i zdefiniowania kontekstu operacji, następnie przechodzi do właściwej funkcji, gdzie widzimy wcześniej wspomniane sekcje.
Import paczek i modułów:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
Określenie podstawowych zmiennych i parametrów funkcji:
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
Pierwsza sekcja naszego kodu to:
## @type: DataSource
## @args: [database = "nyc", table_name = "mysource", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nyc", table_name = "oufciusource", transformation_ctx = "datasource0")
Mamy zdefiniowany typ w formie DataSource.
Argumenty (args:) dla naszego DataSource tj. nazwa bazy danych (database) w AWS Glue Data Catalog, nazwa tabeli (table_name) i kontekst transformacji (transformation_ctx). Poza samą transformacją, wszystkie te informacje pobierane są z Data Catalog. W wyniku tego kroku pozyskujemy „datasource0” (return), który zostanie przetworzony w ramkę dynamiczną (dynamicframe).
DynamicFrame jest zbliżony do znanego z środowiska Spark DataFrame z tą różnicą, że każdy rekord (DynamicRecord) wywoływany w ramach DynamicFrame nie musi być oparty o wcześniej zdefiniowany schemat w tabeli, z której pochodzi a będzie miał schemat zdefiniowany w locie dzięki Data Catalog. Co więcej, DynamicFrame jest w stanie obsłużyć kilka typów danych w ramach jednej kolumny.
Następna sekcja ## @type: ApplyMapping
Jest to sekcja typu „Transform”.
Argumentami jest mapowanie kolumny do kolumny.
## @args: [mapping = [("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "string")], transformation_ctx = "applymapping1"]
Wracając do tabelki „Map source columns to target columns” zauważymy, że to jest dokładnie ta operacja, którą wcześniej zdefiniowaliśmy w formie graficznej. Widzimy nawet zmianę typu danych kolumny „total_amount” z „double” na „string” („total_amount”, „double”, „total_amount”, „string”).
Punktem wejścia dla tej transformacji jest wcześniej stworzona ramka „datasource0”.
## @inputs: [frame = datasource0]
Ramka ta zostanie przetworzona a jej wynik zostanie zachowany w ramce pod nazwą „applymapping1”, która będzie punktem wejścia dla kolejnej transformacji.
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
Zatem wynik transformacji A zapisywany jest do ramki, na którą powołuje się transformacja B. Proponowany skrypt dokona jeszcze dalszych transformacji, natomiast by nie analizować linijka po linijce chciałbym tylko podkreślić jak łatwo się to czyta i że same pisanie operacji skryptowych nie musi być ciężkie. GUI wstawia praktycznie gotowy kod, który trzeba uzupełnić swoimi danymi.
Ostatnim krokiem jest zapisanie już finalnej wersji ramki w naszym „Target”. Gdybyśmy jednak chcieli zachować którąś z pośrednich ramek (po jakimś kroku „transform”) możemy do tego użyć operacji „Spigot”.
Ostatni etap naszego ETL wygląda następująco:
## @type: DataSink
## @args: [catalog_connection = "Sql", connection_options = {"dbtable": "oufciusource", "database": "Mydb"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "Sql", connection_options = {"dbtable": "mysource", "database": "Mydb"}, transformation_ctx = "datasink4")
job.commit()
Mamy zdefiniowany etap Target (## @type: DataSink), metadane tabeli z której pobieramy dane i bazę danych do której zapiszemy naszą ostatnią ramkę {„dbtable”: „mysource”, „database”: „Mydb”}. Metadane znów pozyskiwane są z Data Catalog (np. danych zdefiniowanych w Connection). DataSink ma na celu zapisać dane z dynamicznej ramki (ostatni ## @inputs po wszystkich transformacjach) w zewnętrznej lokacji (Amazon S3, Amazon Redshift, albo JDBC), dla nas będzie to JDBC i MS SQL.
Job.commit() wywołuje naszą operację.
Skoro rozumiem już mój kod, czas podjąć decyzję co dalej. Tuż nad graficznym przedstawieniem etapów naszego ETL mamy kolejne menu, gdzie możemy zadecydować:
- Save as (zapisz JOB jako)
- Save and add trigger (Zapisz i ustaw trigger, który wcześniej został zdefiniowany – o tym później)
- Chose triggers for job (wskaż jakie triggery mają być skojarzone z tym Job)
- Edit job (wróć do ustawień, które były definiowane na początku wizarda)
- Delete (usuń Job)
- Save (zapisz zmiany pod tą samą nazwą)
- Run job (uruchom procedurę ETL)
- Generate diagram (narysuj nowy schemat, np. po tym jak zmieniliśmy kod)
Uruchamiamy naszą procedurę klikając w Run Job.
Plik CSV którego użyłem jest na S3 ma 815 MB i 1048576 rekordów. Source i Destination nie są w tym samym regionie.
Procedura nie udała się dwa razy (podgląd na szczegóły operacji dostępny jest z menu „Jobs”).
Po kliknięciu w Error Logs powinniście zobaczyć przyczynę przerwania zadania (Log operacji jest zapisywany w usłudze Amazon CloudWatch).
Przyczyna obu nieudanych wywołań to:
- S3 bucket dla danych tymczasowych i skryptu jest w innym regionie niż AWS Glue. AWS Glue domyślnie korzysta z swojego regionu przy wywoływaniu operacji i nie używa suffixu regionu w URL, który sam sobie tworzy.
- S3 bucket z danymi źródłowymi (mój plik CSV) jest w innym regionie (j.w.)
By rozwiązać ten problem stworzyłem S3 bucket w tym samym regionie, gdzie uruchomiona jest AWS Glue. Przekopiowałem tam źródło i stworzyłem nowy Job z nowym źródłem (tym razem lokalne S3). Te wywołanie pomyślnie dokonało operacji ETL.
Przy użyciu 2 DPU, czas wykonania wyniósł 4 minuty.
Zaloguje się więc do MS SQL (baza danych „sales” w „Data Catalog”) by zobaczyć jak wygląda nowa tabela, tabelę podejrzę używając SQL Management Studio.
Na podglądzie widzimy tabelę “yellow_tripdata_2017_01_csv”, której wcześniej nie było w mojej bazie danych.
Sprawdźmy, czy są tam kolumny, które wybralismy w „Map the source columns to target columns”:
A teraz sprawdźmy czy są tam jakieś dane:
Udało się! Sprawdźmy jeszcze kiedy tabela została stworzona:
Różnica pomiędzy czasem widocznym w konsoli AWS i w SQL wynika z tego, ze instancja EC2 używa domyślnie czasu UTC (+ 0), gdy konsola AWS używa czasu użytkownika który w moim przypadku jest polskim czasem letnim (+2).
Zatem wszystko poszło dobrze.
Kolejne kroki
Teraz powinniśmy odpalić na żądanie „Crawlera” na bazie „sales” by odświeżyć schemat i zaktualizować informacje w „Data Catalog”.
Po kliknięciu w Tables z kategorii „Databases”, otwieram moją bazę danych („sales”) i szukam na liście nowej tabeli.
Klikam w tabelę:
Kolejnym krokiem będzie zrobienie operacji Join z dwóch tabel z mojej bazy MS SQL i zapisanie wyniku w S3 tak by Amazon RedShift mógł go użyć jako źródła.
W tym celu użyjemy wirtualnego Notebooka (zbliżony do Apache Zeppelin Notebook), którego możemy zdeployować w ramach AWS Glue, ale o tym w następnej części naszej serii.
Podsumowanie
W ramach naszego artykułu zdefiniowaliśmy nasz pierwszy Job ETL i przeszliśmy przez proponowany w wizardzie kod. Dane z pliku CSV na S3 zostały po transformacji poprawnie zaimportowane do bazy MS SQL.