Xlera8

Організуйте наскрізний конвеєр ETL за допомогою Amazon S3, AWS Glue і Amazon Redshift Serverless за допомогою Amazon MWAA | Веб-сервіси Amazon

Керовані робочі процеси Amazon для Apache Airflow (Amazon MWAA) — це керована служба оркестровки для Потік повітря Apache які ви можете використовувати для налаштування та керування конвеєрами даних у хмарі в масштабі. Apache Airflow — це інструмент із відкритим кодом, який використовується для програмного створення, планування та моніторингу послідовностей процесів і завдань, які називаються Робочі процеси. З Amazon MWAA ви можете використовувати Apache Airflow і Python для створення робочих процесів без необхідності керувати основною інфраструктурою для масштабованості, доступності та безпеки.

Використовуючи кілька облікових записів AWS, організації можуть ефективно масштабувати робоче навантаження та керувати своєю складністю в міру зростання. Цей підхід забезпечує надійний механізм для пом’якшення потенційного впливу збоїв або збоїв, гарантуючи, що критичні робочі навантаження залишаються в робочому стані. Крім того, це дає змогу оптимізувати витрати шляхом узгодження ресурсів із конкретними випадками використання, гарантуючи, що витрати добре контролюються. Ізолюючи робочі навантаження з певними вимогами безпеки або вимогами відповідності, організації можуть підтримувати найвищий рівень конфіденційності та безпеки даних. Крім того, можливість організувати декілька облікових записів AWS у структурований спосіб дозволяє узгодити ваші бізнес-процеси та ресурси відповідно до ваших унікальних операційних, нормативних і бюджетних вимог. Цей підхід сприяє ефективності, гнучкості та масштабованості, дозволяючи великим підприємствам задовольняти свої потреби та досягати своїх цілей.

Ця публікація демонструє, як організувати наскрізний конвеєр вилучення, перетворення та завантаження (ETL) за допомогою Служба простого зберігання Amazon (Amazon S3), Клей AWS та Amazon Redshift без сервера за допомогою Amazon MWAA.

Огляд рішення

У цій публікації ми розглядаємо варіант використання, коли команда розробки даних хоче побудувати процес ETL і надати найкращий досвід своїм кінцевим користувачам, коли вони хочуть запитувати найновіші дані після додавання нових необроблених файлів до Amazon S3 у центральному обліковий запис (обліковий запис A на наведеній нижче схемі архітектури). Команда розробки даних хоче відокремити необроблені дані у власний обліковий запис AWS (обліковий запис B на схемі) для підвищення безпеки та контролю. Вони також хочуть виконувати роботу з обробки та перетворення даних у своєму власному обліковому записі (Обліковий запис B), щоб розділити обов’язки та запобігти будь-яким ненавмисним змінам вихідних необроблених даних, наявних у центральному обліковому записі (Обліковий запис A). Цей підхід дозволяє команді обробляти необроблені дані, отримані з облікового запису A в обліковий запис B, який призначений для завдань обробки даних. Це забезпечує безпечне зберігання необроблених і оброблених даних між кількома обліковими записами, якщо це необхідно, для покращеного керування та безпеки даних.

Наше рішення використовує наскрізний конвеєр ETL, організований Amazon MWAA, який шукає нові додаткові файли в розташуванні Amazon S3 в обліковому записі A, де присутні необроблені дані. Це робиться шляхом виклику завдань AWS Glue ETL і запису в об’єкти даних у безсерверному кластері Redshift в обліковому записі B. Потім починає працювати конвеєр збережені процедури і команди SQL на Redshift Serverless. Після завершення виконання запитів an РОЗвантажити операція викликається зі сховища даних Redshift у відро S3 в обліковому записі A.

Оскільки безпека важлива, у цій публікації також описано, як налаштувати підключення Airflow за допомогою Менеджер секретів AWS щоб уникнути зберігання облікових даних бази даних у підключеннях і змінних Airflow.

Наступна діаграма ілюструє огляд архітектури компонентів, залучених до оркестровки робочого процесу.

Робочий процес складається з наступних компонентів:

  • Вихідний і цільовий сегменти S3 знаходяться в центральному обліковому записі (Обліковий запис A), тоді як Amazon MWAA, AWS Glue і Amazon Redshift знаходяться в іншому обліковому записі (Обліковий запис B). Між сегментами S3 в обліковому записі A з ресурсами в обліковому записі B налаштовано міжобліковий доступ, щоб мати можливість завантажувати та вивантажувати дані.
  • У другому обліковому записі Amazon MWAA розміщено в одному VPC, а Redshift Serverless — в іншому VPC, які підключені через піринг VPC. Безсерверна робоча група Redshift захищена всередині приватних підмереж у трьох зонах доступності.
  • Такі секрети, як ім’я користувача, пароль, порт БД і регіон AWS для Redshift Serverless, зберігаються в диспетчері секретів.
  • Кінцеві точки VPC створено для Amazon S3 і Secrets Manager для взаємодії з іншими ресурсами.
  • Зазвичай розробники даних створюють ациклічний графік, спрямований повітряним потоком (DAG) і передають свої зміни в GitHub. За допомогою дій GitHub вони розгортаються у відро S3 в обліковому записі B (для цієї публікації ми завантажуємо файли безпосередньо в сегмент S3). Відро S3 зберігає файли, пов’язані з Airflow, наприклад файли DAG, requirements.txt файли та плагіни. Сценарії та ресурси AWS Glue ETL зберігаються в іншому сегменті S3. Таке розділення допомагає зберегти організованість і уникнути плутанини.
  • Airflow DAG використовує різні оператори, датчики, підключення, завдання та правила для запуску конвеєра даних за потреби.
  • Ви ввійшли в журнали Airflow Amazon CloudWatch, і сповіщення можна налаштувати для завдань моніторингу. Для отримання додаткової інформації див Панелі моніторингу та сигналізації на Amazon MWAA.

Передумови

Оскільки це рішення зосереджено на використанні Amazon MWAA для організації конвеєра ETL, вам потрібно заздалегідь налаштувати певні основні ресурси для облікових записів. Зокрема, вам потрібно створити сегменти та папки S3, ресурси AWS Glue і ресурси Redshift Serverless у відповідних облікових записах перед впровадженням повної інтеграції робочого процесу за допомогою Amazon MWAA.

Розгорніть ресурси в обліковому записі A за допомогою AWS CloudFormation

В обліковому записі A запустіть наданий AWS CloudFormation стек для створення таких ресурсів:

  • Вихідні та цільові сегменти та папки S3. Як найкраща практика, структури вхідних і вихідних сегментів форматуються за допомогою розділення у стилі вулика s3://<bucket>/products/YYYY/MM/DD/.
  • Зразок набору даних називається products.csv, який ми використовуємо в цій публікації.

Завантажте завдання AWS Glue в Amazon S3 в обліковому записі B

В обліковому записі B створіть розташування Amazon S3 під назвою aws-glue-assets-<account-id>-<region>/scripts (якщо немає). Замініть параметри для ідентифікатора облікового запису та регіону в sample_glue_job.py сценарій і завантажте файл завдання AWS Glue у розташування Amazon S3.

Розгорніть ресурси в обліковому записі B за допомогою AWS CloudFormation

В обліковому записі B запустіть наданий шаблон стека CloudFormation, щоб створити такі ресурси:

  • Відро S3 airflow-<username>-bucket для зберігання пов’язаних із Airflow файлів із такою структурою:
    • даги – Папка для файлів DAG.
    • plugins – Файл для будь-яких настроюваних або спільнотних плагінів Airflow.
    • вимога - The requirements.txt файл для будь-яких пакетів Python.
    • scripts – Будь-які сценарії SQL, які використовуються в DAG.
    • дані – Будь-які набори даних, що використовуються в DAG.
  • Безсерверне середовище Redshift. Ім’я робочої групи та простір імен мають префікс sample.
  • Середовище AWS Glue, яке містить:
    • Клей AWS гусеничний, який сканує дані з вихідного сегмента S3 sample-inp-bucket-etl-<username> на рахунку А.
    • База даних називається products_db у каталозі даних AWS Glue.
    • ELT робота званий sample_glue_job. Це завдання може читати файли з products у каталозі даних і завантажити дані в таблицю Redshift products.
  • Кінцева точка шлюзу VPC для Amazon S3.
  • Середовище Amazon MWAA. Докладні кроки щодо створення середовища Amazon MWAA за допомогою консолі Amazon MWAA див Представляємо керовані робочі процеси Amazon для Apache Airflow (MWAA).

стек запуску 1

Створіть ресурси Amazon Redshift

Створіть дві таблиці та збережену процедуру в робочій групі Redshift Serverless за допомогою products.sql файлу.

У цьому прикладі ми створюємо дві таблиці під назвою products та products_f. Ім'я збереженої процедури sp_products.

Налаштуйте дозволи Airflow

Після успішного створення середовища Amazon MWAA статус відображатиметься як наявний, Вибирати Відкрийте інтерфейс Airflow щоб переглянути інтерфейс Airflow. DAG автоматично синхронізуються з сегмента S3 і відображаються в інтерфейсі користувача. Однак на цьому етапі в папці S3 немає DAG.

Додайте керовану клієнтом політику AmazonMWAAFullConsoleAccess, який надає користувачам Airflow дозвіл на доступ Управління ідентифікацією та доступом AWS (IAM) і приєднайте цю політику до ролі Amazon MWAA. Для отримання додаткової інформації див Доступ до середовища Amazon MWAA.

Політики, пов’язані з роллю Amazon MWAA, мають повний доступ і повинні використовуватися лише для тестування в безпечному тестовому середовищі. Для виробничих розгортань дотримуйтеся принципу найменших привілеїв.

Налаштуйте середовище

У цьому розділі описано кроки для налаштування середовища. Процес включає наступні етапи високого рівня:

  1. Оновіть усіх необхідних постачальників.
  2. Налаштуйте міжобліковий доступ.
  3. Встановіть однорангове з’єднання VPC між Amazon MWAA VPC і Amazon Redshift VPC.
  4. Налаштуйте Secrets Manager для інтеграції з Amazon MWAA.
  5. Визначте підключення Airflow.

Оновіть провайдерів

Виконайте кроки в цьому розділі, якщо ваша версія Amazon MWAA менше 2.8.1 (остання версія на момент написання цієї публікації).

Провайдери це пакети, які підтримуються спільнотою та включають усі основні оператори, гачки та датчики для певної послуги. Постачальник Amazon використовується для взаємодії з такими службами AWS, як Amazon S3, Amazon Redshift Serverless, AWS Glue тощо. У постачальника Amazon є понад 200 модулів.

Незважаючи на те, що в Amazon MWAA підтримується версія Airflow 2.6.3, яка постачається разом із наданим Amazon пакетом версії 8.2.0, підтримка Amazon Redshift Serverless не була додана до версії пакета Amazon 8.4.0. Оскільки стандартна версія пакетного постачальника є старішою, ніж тоді, коли була представлена ​​підтримка Redshift Serverless, версію постачальника потрібно оновити, щоб використовувати цю функцію.

Першим кроком є ​​оновлення файлу обмежень і requirements.txt файл із правильними версіями. Відноситься до Зазначення нових пакетів провайдера кроки для оновлення пакета постачальника Amazon.

  1. Укажіть вимоги наступним чином:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Оновіть версію у файлі обмежень до 8.4.0 або новішої.
  3. Додати constraints-3.11-updated.txt файл до /dags папку.

Відноситься до Версії Apache Airflow на Amazon Managed Workflows для Apache Airflow для правильних версій файлу обмежень залежно від версії Airflow.

  1. Перейдіть до середовища Amazon MWAA та виберіть Редагувати.
  2. під Код DAG в Amazon S3, Для Файл вимог, виберіть останню версію.
  3. Вибирати зберегти.

Це оновить середовище, і нові постачальники стануть дійсними.

  1. Щоб перевірити версію постачальника, перейдіть до Провайдери під Адміністратор таблиці.

Версія пакета постачальника Amazon має бути 8.4.0, як показано на наступному знімку екрана. Якщо ні, під час завантаження сталася помилка requirements.txt. Щоб усунути будь-які помилки, перейдіть до консолі CloudWatch і відкрийте requirements_install_ip увійдіть Потоки журналів, де перераховані помилки. Відноситься до Увімкнення журналів на консолі Amazon MWAA для більш докладної інформації.

Налаштуйте міжобліковий доступ

Щоб отримати доступ до сегментів S3 для завантаження та вивантаження даних, потрібно налаштувати міжоблікові політики та ролі між обліковими записами A та B. Виконайте наступні дії:

  1. В обліковому записі A налаштуйте політику сегмента для сегмента sample-inp-bucket-etl-<username> щоб надати дозволи ролям AWS Glue і Amazon MWAA в обліковому записі B для об’єктів у сегменті sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Подібним чином налаштуйте політику відра для відра sample-opt-bucket-etl-<username> щоб надати дозволи ролям Amazon MWAA в обліковому записі B для розміщення об’єктів у цьому відрі:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. В обліковому записі A створіть політику IAM під назвою policy_for_roleA, який дозволяє виконувати необхідні дії Amazon S3 над вихідним сегментом:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Створіть нову роль IAM під назвою RoleA з обліковим записом B як роллю довіреної особи та додайте цю політику до ролі. Це дозволяє обліковому запису B взяти на себе роль A для виконання необхідних дій Amazon S3 у вихідному сегменті.
  5. В обліковому записі B створіть політику IAM під назвою s3-cross-account-access з дозволом на доступ до об’єктів у відрі sample-inp-bucket-etl-<username>, який знаходиться на рахунку А.
  6. Додайте цю політику до ролі AWS Glue і Amazon MWAA:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. В обліковому записі B створіть політику IAM policy_for_roleB визначення облікового запису А як довіреної особи. Нижче наведено політику довіри RoleA в обліковому записі А:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Створіть нову роль IAM під назвою RoleB з Amazon Redshift як довіреним типом об’єкта та додайте цю політику до ролі. Це дозволяє RoleB припустити RoleA в обліковому записі A, а також бути прийнятим Amazon Redshift.
  9. Приєднувати RoleB у простір імен Redshift Serverless, тому Amazon Redshift може записувати об’єкти до вихідного сегмента S3 в обліковому записі A.
  10. Прикріпіть поліс policy_for_roleB до ролі Amazon MWAA, що дає Amazon MWAA доступ до вихідного сегмента в обліковому записі A.

Відноситься до Як надати міжобліковий доступ до об’єктів, які знаходяться в сегментах Amazon S3? щоб дізнатися більше про налаштування доступу між обліковими записами до об’єктів в Amazon S3 від AWS Glue і Amazon MWAA. Відноситься до Як КОПІЮВАТИ або ВИВАНТАЖИТИ дані з Amazon Redshift у сегмент Amazon S3 в іншому обліковому записі? щоб дізнатися більше про налаштування ролей для вивантаження даних з Amazon Redshift на Amazon S3 з Amazon MWAA.

Налаштуйте піринг VPC між Amazon MWAA та Amazon Redshift VPC

Оскільки Amazon MWAA та Amazon Redshift знаходяться у двох окремих VPC, вам потрібно налаштувати між ними піринг VPC. Необхідно додати маршрут до таблиць маршрутів, пов’язаних із підмережами для обох служб. Відноситься до Робота з піринговими з'єднаннями VPC щоб дізнатися більше про піринг VPC.

Переконайтеся, що діапазон CIDR Amazon MWAA VPC дозволено в групі безпеки Redshift, а діапазон CIDR Amazon Redshift VPC дозволено в групі безпеки Amazon MWAA, як показано на наступному знімку екрана.

Якщо будь-який із попередніх кроків налаштовано неправильно, ви, ймовірно, зіткнетеся з помилкою «Час очікування з’єднання» під час запуску DAG.

Налаштуйте підключення Amazon MWAA за допомогою Secrets Manager

Коли конвеєр Amazon MWAA налаштовано на використання диспетчера секретів, він спочатку шукатиме з’єднання та змінні в альтернативній серверній частині (наприклад, диспетчер секретів). Якщо альтернативний сервер містить потрібне значення, воно повертається. В іншому випадку він перевірить базу даних метаданих на наявність значення та поверне його замість цього. Для отримання додаткової інформації зверніться до Налаштування підключення Apache Airflow за допомогою секрету AWS Secrets Manager.

Виконайте такі дії:

  1. Налаштування a Кінцева точка VPC щоб зв’язати Amazon MWAA та Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

Це дозволяє Amazon MWAA отримувати доступ до облікових даних, які зберігаються в диспетчері секретів.

  1. Щоб надати Amazon MWAA дозвіл на доступ до секретних ключів Secrets Manager, додайте політику під назвою SecretsManagerReadWrite до IAM ролі середовища.
  2. Щоб створити серверну частину Secrets Manager як параметр конфігурації Apache Airflow, перейдіть до параметрів конфігурації Airflow, додайте наступні пари ключ-значення та збережіть налаштування.

Це налаштовує Airflow на пошук рядків підключення та змінних у airflow/connections/* та airflow/variables/* шляхи:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. Щоб створити рядок URI підключення Airflow, перейдіть до AWS CloudShell і введіть в оболонку Python.
  2. Запустіть наступний код, щоб створити рядок URI підключення:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

Рядок підключення має бути згенерований таким чином:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>

  1. Додайте підключення до диспетчера секретів за допомогою такої команди в Інтерфейс командного рядка AWS (AWS CLI).

Це також можна зробити з консолі диспетчера секретів. Це буде додано в диспетчер секретів як відкритий текст.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Використовуйте зв'язок airflow/connections/secrets_redshift_connection в DAG. Коли DAG запущено, він шукатиме це з’єднання та отримуватиме секрети з диспетчера секретів. В випадку RedshiftDataOperator, передайте secret_arn як параметр замість назви підключення.

Ви також можете додавати секрети за допомогою консолі диспетчера секретів як пари ключ-значення.

  1. Додайте інший секрет у Менеджер секретів і збережіть його як airflow/connections/redshift_conn_test.

Створіть підключення Airflow через базу метаданих

Ви також можете створювати підключення в інтерфейсі користувача. У цьому випадку деталі підключення зберігатимуться в базі метаданих Airflow. Якщо середовище Amazon MWAA не налаштовано на використання серверної частини диспетчера секретів, воно перевірить базу даних метаданих на наявність значення та поверне його. Ви можете створити підключення Airflow за допомогою інтерфейсу користувача, AWS CLI або API. У цьому розділі ми покажемо, як створити з’єднання за допомогою інтерфейсу користувача Airflow.

  1. для Ідентифікатор підключення, введіть назву з’єднання.
  2. для Тип з'єднаннявиберіть Амазонська червона зміна.
  3. для Господар, введіть кінцеву точку Redshift (без порту та бази даних) для Redshift Serverless.
  4. для Database, введіть dev.
  5. для користувач, введіть ім’я користувача адміністратора.
  6. для Пароль, введіть ваш пароль.
  7. для порт, використовуйте порт 5439.
  8. для Extra, встановіть region та timeout параметри
  9. Перевірте з’єднання, а потім збережіть налаштування.

Створіть і запустіть DAG

У цьому розділі ми описуємо, як створити DAG за допомогою різних компонентів. Після того, як ви створите та запустите DAG, ви можете перевірити результати, зробивши запит до таблиць Redshift і перевіривши цільові сегменти S3.

Створіть DAG

У Airflow конвеєри даних визначені в коді Python як DAG. Ми створюємо DAG, який складається з різних операторів, датчиків, підключень, завдань і правил:

  • DAG починається з пошуку вихідних файлів у відрі S3 sample-inp-bucket-etl-<username> за рахунком А за поточний день використанням S3KeySensor. S3KeySensor використовується для очікування наявності одного чи кількох ключів у сегменті S3.
    • Наприклад, наше відро S3 розділено як s3://bucket/products/YYYY/MM/DD/, тож наш датчик має перевіряти папки з поточною датою. Ми отримали поточну дату в DAG і передали її S3KeySensor, який шукає будь-які нові файли в папці поточного дня.
    • Ми теж поставили wildcard_match as True, що дозволяє здійснювати пошук на bucket_key інтерпретувати як шаблон підстановки Unix. Встановіть mode до reschedule так що завдання датчика звільняє робоче місце, коли критерії не задовольняються, і його переплановують на пізніший час. Як найкраща практика, використовуйте цей режим, коли poke_interval становить більше 1 хвилини, щоб запобігти надмірному навантаженню планувальника.
  • Коли файл стає доступним у сегменті S3, сканер AWS Glue запускається за допомогою GlueCrawlerOperator щоб сканувати вихідне відро S3 sample-inp-bucket-etl-<username> під обліковим записом A та оновлює метадані таблиці під products_db бази даних у каталозі даних. Веб-сканер використовує роль AWS Glue і базу даних Data Catalog, які були створені на попередніх кроках.
  • DAG використовує GlueCrawlerSensor дочекатися завершення сканера.
  • Коли роботу сканера завершено, GlueJobOperator використовується для запуску завдання AWS Glue. Назва сценарію AWS Glue (разом із розташуванням) передається оператору разом із роллю AWS Glue IAM. Інші параметри, як GlueVersion, NumberofWorkers та WorkerType передаються за допомогою create_job_kwargs параметр.
  • DAG використовує GlueJobSensor дочекатися завершення роботи AWS Glue. Після завершення з’явиться проміжна таблиця Redshift products буде завантажено дані з файлу S3.
  • Ви можете підключитися до Amazon Redshift із Airflow за допомогою трьох різних Оператори:
    • PythonOperator.
    • SQLExecuteQueryOperator, який використовує з’єднання PostgreSQL і redshift_default як підключення за замовчуванням.
    • RedshiftDataOperator, який використовує Redshift Data API та aws_default як підключення за замовчуванням.

У нашому DAG ми використовуємо SQLExecuteQueryOperator та RedshiftDataOperator щоб показати, як використовувати ці оператори. Виконуються збережені процедури Redshift RedshiftDataOperator. DAG також виконує команди SQL в Amazon Redshift, щоб видалити дані з проміжної таблиці за допомогою SQLExecuteQueryOperator.

Оскільки ми налаштували наше середовище Amazon MWAA на пошук з’єднань у диспетчері секретів, під час роботи DAG він отримує з диспетчера секретів деталі підключення Redshift, як-от ім’я користувача, пароль, хост, порт і регіон. Якщо підключення не знайдено в диспетчері секретів, значення витягуються зі з’єднань за замовчуванням.

In SQLExecuteQueryOperator, ми передаємо ім’я підключення, яке ми створили в диспетчері секретів. Воно шукає airflow/connections/secrets_redshift_connection і отримує секрети з диспетчера секретів. Якщо диспетчер секретів не налаштовано, підключення, створене вручну (наприклад, redshift-conn-id) можна пройти.

In RedshiftDataOperator, ми передаємо secret_arn of the airflow/connections/redshift_conn_test підключення, створене в Secrets Manager як параметр.

  • Як останнє завдання, RedshiftToS3Operator використовується для вивантаження даних із таблиці Redshift у сегмент S3 sample-opt-bucket-etl на рахунку Б. airflow/connections/redshift_conn_test з диспетчера секретів використовується для вивантаження даних.
  • TriggerRule встановлений в ALL_DONE, що дозволяє виконувати наступний крок після завершення всіх попередніх завдань.
  • Залежність завдань визначається за допомогою chain() функція, яка дозволяє паралельно виконувати завдання, якщо це необхідно. У нашому випадку ми хочемо, щоб усі завдання запускалися послідовно.

Нижче наведено повний код DAG. The dag_id має відповідати назві сценарію DAG, інакше його не буде синхронізовано з інтерфейсом користувача Airflow.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Перевірте запуск DAG

Після того, як ви створите файл DAG (замініть змінні в сценарії DAG) і завантажите його в s3://sample-airflow-instance/dags папку, вона буде автоматично синхронізована з інтерфейсом користувача Airflow. Усі DAG відображаються на DAG вкладка. Перемкнути ON можливість зробити DAG працездатним. Оскільки наш DAG налаштований на schedule="@once", вам потрібно вручну запустити завдання, вибравши піктограму запуску під Дії. Після завершення DAG статус оновлюється зеленим кольором, як показано на наступному знімку екрана.

У зв'язку у розділі є варіанти перегляду коду, графіка, сітки, журналу тощо. Виберіть Графік щоб візуалізувати DAG у форматі графіка. Як показано на наступному знімку екрана, кожен колір вузла позначає певний оператор, а колір контуру вузла позначає певний статус.

Перевірте результати

На консолі Amazon Redshift перейдіть до Редактор запитів версії 2 і виберіть дані в products_f стіл. Таблиця має бути завантажена та мати таку саму кількість записів, як файли S3.

На консолі Amazon S3 перейдіть до сегмента S3 s3://sample-opt-bucket-etl на рахунку Б. В product_f файли повинні бути створені в структурі папок s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Прибирати

Очистіть ресурси, створені в рамках цієї публікації, щоб уникнути поточних платежів:

  1. Видаліть стеки CloudFormation і сегмент S3, які ви створили як передумови.
  2. Видаліть VPC і пірингові з’єднання VPC, політики та ролі між обліковими записами, а також секрети в диспетчері секретів.

Висновок

За допомогою Amazon MWAA ви можете створювати складні робочі процеси за допомогою Airflow і Python без керування кластерами, вузлами чи будь-якими іншими операційними витратами, які зазвичай пов’язані з розгортанням і масштабуванням Airflow у виробництві. У цій публікації ми показали, як Amazon MWAA забезпечує автоматичний спосіб отримання, трансформації, аналізу та розподілу даних між різними обліковими записами та службами в AWS. Більше прикладів інших операторів AWS див GitHub сховище; ми радимо вам дізнатися більше, спробувавши деякі з цих прикладів.


Про авторів


Радхіка Джаккула є архітектором рішень для прототипування великих даних в AWS. Вона допомагає клієнтам створювати прототипи за допомогою аналітичних служб AWS і спеціально створених баз даних. Вона є спеціалістом з оцінки широкого спектру вимог і застосування відповідних сервісів AWS, інструментів для великих даних і фреймворків для створення надійної архітектури.

Сідхант Муралідхар є головним технічним менеджером з роботи з клієнтами в AWS. Він працює з великими корпоративними клієнтами, які працюють на AWS. Він захоплюється роботою з клієнтами та допомагає їм розробляти робочі навантаження для забезпечення витрат, надійності, продуктивності та операційної досконалості в масштабі їхньої хмарної подорожі. Він також має великий інтерес до аналізу даних.

Зв'яжіться з нами!

Привіт! Чим я можу вам допомогти?