1 of 52

APACHE AIRFLOW

DEEP DIVE 19.12.2023

2 of 52

ЧТО ТАКОЕ AIRFLOW

Apache Airflow - это платформа (созданная airbnb) с открытым исходным кодом, используемая для организации и планирования сложных рабочих процессов и конвейеров данных. Она позволяет определять, планировать и отслеживать задачи в виде направленных ациклических графов (DAG). Airflow предоставляет способ программного создания, планирования и мониторинга рабочих процессов, упрощая управление и автоматизацию конвейеров передачи данных.

2

3 of 52

ТИПИЧНЫЙ USE CASE

3

База данных 1

AIRFLOW DAG 1

Сервис погоды

Сервис BTC to USD

База данных 2

AIRFLOW DAG 2

UI Apache airflow

4 of 52

4

5 of 52

КОНВЕЙЕРЫ ОБРАБОТКИ ДАННЫХ

Часто бывает так что задач становится слишком много, между задачами есть зависимости, в связи с этим для облегчения понимания и управления было принято использовать графы, в частности направленные (ориентированные или же орграфы) ациклические графы.

Т.е. те чьи ребра имеют одно направление (можно так же представить что связь между узлами односторонняя)

Так же нет петель или циклов (ациклический)

(это все позволяет избегать ошибок)

5

6 of 52

6

7 of 52

!!!ВАЖНО!!!

При использовании AIRFLOW вы обычно определяете ресурсы, необходимые для каждой задачи, используя параметры. Эти параметры позволяют вам указать ЦП, память и другие требования к ресурсам для задачи. Тем не менее, важно отметить, что сам AIRFLOW не применяет и не управляет распределением этих ресурсов.

Фактическое распределение ресурсов происходит в среде исполнения. Например, если вы используете Kubernetes в качестве исполнителя, Kubernetes планирует и распределять ресурсы на основе определенных требований каждой задачи. Точно так же, если вы используете YARN/MESOS, MESOS будет обрабатывать распределение ресурсов.

Роль AIRFLOW заключается в том, чтобы организовать рабочий процесс и управлять зависимостями между задачами. Это гарантирует, что задачи выполняются в правильном порядке и обрабатывают планирование задач, повторение и мониторинг. С другой стороны, среда выполнения отвечает за управление ресурсами и выполнение задач на основе выделенных ресурсов.

AIRFLOW напрямую не выделяет ресурсы на задачи. Он опирается на основную среду выполнения для обработки распределения ресурсов на основе определенных требований каждой задачи.

7

8 of 52

DAG

8

import datetime

from airflow import DAG

from airflow.operators.empty import EmptyOperator

with DAG(

dag_id="my_dag_name",

start_date=datetime.datetime(2021, 1, 1),

schedule="@daily",

):

EmptyOperator(task_id="task")

import datetime

from airflow import DAG

from airflow.operators.empty import EmptyOperator

my_dag = DAG(

dag_id="my_dag_name",

start_date=datetime.datetime(2021, 1, 1),

schedule="@daily",

)

EmptyOperator(task_id="task", dag=my_dag)

import datetime

from airflow.decorators import dag

from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")

def generate_dag():

EmptyOperator(task_id="task")

generate_dag()

9 of 52

9

10 of 52

ОПЕРАТОРЫ

10

  1. BashOperator: Executes a bash command or script.
  2. PythonOperator: Executes a Python function. [+ TASKFLOW API]
  3. EmailOperator: Sends an email.
  4. SQLOperator: Executes an SQL query on a database.
  5. HiveOperator: Executes a Hive query.
  6. S3KeySensor: Waits for a specific key to be present in an S3 bucket.
  7. SFTPOperator: Transfers files to/from an SFTP server.
  8. HttpOperator: Performs an HTTP request.
  9. SlackAPIOperator: Sends a message to a Slack channel.
  10. DummyOperator: Does nothing, used for creating task dependencies.

По сути Оператор - это шаблон предзаполненной таски (задачи) декларативно объявляющейся в ОАГе

11 of 52

11

https://docs.google.com/spreadsheets/d/1d-f4tM4JsM04uu_q2SCLbASN760MqRt5Swta_F6vBOo/edit#gid=0

12 of 52

12

13 of 52

А КАК ИСПОЛЬЗОВАТЬ (ПАЙ)СПАРК???

13

def run_pyspark():

spark = SparkSession.builder \

.appName("MyPySparkJob") \

.config("spark.master", "yarn") \

.config("spark.submit.deployMode", "cluster") \

.config("spark.hadoop.fs.defaultFS", "hdfs://<hdfs-host>:<hdfs-port>") \

.getOrCreate()

# Your PySpark code here

# ...

spark.stop()

dag = DAG(

dag_id='my_pyspark_dag',

schedule_interval='0 0 * * *’,

start_date=datetime(2022, 1, 1),

)

pyspark_task = PythonOperator(

task_id='run_pyspark',

python_callable=run_pyspark,

dag=dag,

)

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from pyspark.sql import SparkSession

14 of 52

14

15 of 52

CONNECTIONS & HOOKS

15

Соединение (connection) – это набор параметров (логин, пароль, хост) и некая внешняя система c уникальным именем (conn_id), к которой и происходит подключение. Такой системой может быть базы и хранилища данных, электронная почта, AWS, Google Cloud, Telegram и т.д. Всякий раз, когда вам нужно изнутри Airflow подключиться к БД, то будут использоваться соединения и хуки.

Admin->Connection.

16 of 52

CONNECTIONS & HOOKS

16

Хук (hook) предоставляет интерфейс для взаимодействия с внешней системой в пределах одного графа. Например, некоторые задачи требуют доступа к MySQL, и чтобы не устанавливать связь каждый раз в нужной задаче, можно использовать хук. Хуки также позволяют не хранить параметры аутентификации в графе. По сути своей, хук позволяет использовать возможности SQLOperator внутри PythonOperator.

Хук — это SQLOperator, который можно использовать в функциях (их гораздо больше конечно)

sql = """ INSERT INTO example (id, exec_time) values (%d, "%s") ""”

sqlite_hook=SqliteHook(sqlite_conn_id='sqlite_conn’)

sqlite_hook.run(sql % (0, exec_time))

17 of 52

17

Cенсоры

18 of 52

18

Flow задач

19 of 52

ШАБЛОНЫ JINJA

  • {{ }} — это шаблонная переменная для Jinja, она будет доступна как переменная среды с заданным именем
  • Шаблоны Jinja можно использовать с каждым параметром, помеченным в документации как «templated». Подмена происходит непосредственно перед вызовом функции pre_execute вашего оператора.

19

BashOperator(�task_id="print_day_of_week",�bash_command="echo Today is {{ execution_date.format('dddd') }}",)

20 of 52

20

21 of 52

XCOM

  • XComs (“Cross-communications”), позволяет обмениваться небольшими фрагментами данных между задачами
  • Custom xcom
  • Размер от 2 гб до 64 кб
  • Создают неявные зависимости между джобами (через интерфейс их не увидеть)

21

def _train_model(**context):� model_id = str(uuid.uuid4()) context["task_instance"].xcom_push(key="model_id", value=model_id)

train_model = PythonOperator( t ask_id="train_model", python_callable=_train_model,

)

def _deploy_model(**context):� model_id = context["task_instance"].xcom_pull(

Task_ids="train_model", key="model_id" )

print(f"Deploying model {model_id}")

deploy_model = PythonOperator( task_id="deploy_model", python_callable=_deploy_model,

)

!!Вы несете ответственность за то, чтобы задачи с зависимостями XCom выполнялись в правильном порядке, Airflow не сделает этого за вас. !!

22 of 52

УБОРКА XCOM ЗА СОБОЙ

XCOM так же важно чистить (об этом так же упомянута на слайде Branch Operator)

Можно на уровне кода

Так же можно на уровне UI.

В целом способов много

22

from airflow.models import DagRun

dag_id = 'your_dag_id'

execution_date = '2022-01-01T00:00:00+00:00'

dag_run = DagRun.find(dag_id=dag_id, execution_date=execution_date)

dag_run[0].xcom_clear()

delete_xcom_task = PostgresOperator( task_id='delete-xcom-task', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id=dag.dag_id and task_id='your_task_id' and execution_date={{ ds }}", dag=dag)

23 of 52

JINJA + XCOM

def deploy_model(templates_dict, **context):

model_id = templates_dict["model_id"]

print(f"Deploying model {model_id}")

deploy_model = PythonOperator(

task_id="deploy_model",

python_callable=_deploy_model,

templates_dict={

"model_id": "{{task_instance.xcom_pull(� task_ids='train_model', key='model_id')}}"

  • }, )

23

24 of 52

ЛОГГИРОВАНИЕ

по умолчанию Airflow использует конфигурацию что выводится в консоль. Вы можете изменить настройки ведения логов в файле конфигурации Airflow (airflow.cfg)

logger = logging.getLogger("airflow.task")

By default, Airflow outputs logs to the base_log_folder configured in airflow.cfg, which is located in your $AIRFLOW_HOME directory.

Преимущество использования логгера перед принтами в том, что вы можете вести логи на разных уровнях и контролировать, какие логи отправляются и когда.

Например, по умолчанию используется airflow.регистратор задач установлен на уровне INFO, что означает, что логи на уровне DEBUG не регистрируются. Чтобы просматривать журналы отладки при отладке ваших задач на Python, вам нужно установить AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG или изменить значение logging_level в airflow.cfg. После отладки вы можете изменить logging_level обратно на INFO, не изменяя свой код DAG.

24

25 of 52

ЭФФЕКТ НЕУДАЧ

Если вышестоящие задачи постоянно фейлятся то это распространяется на нижестоящие (логично). Что в свою очередь может повлечь за собой массовый сбой и падения - если правила триггеров определены как all_success, которое требует, чтобы все его зависимости были успешно выполнены.

Это мелочь но, стоит не забывать об этой мелочи чтобы не получать неприятные сюрпризы.

task1 = DummyOperator( task_id='task1', all_success=False, dag=dag )

25

26 of 52

ПЕРЕЗАПУСК ЗАФЕЙЛЕНЫХ ТАСОК

26

27 of 52

BACKFILLING

In Apache Airflow, backfilling refers to the process of running historical or missed tasks for a specific DAG (Directed Acyclic Graph). It allows you to schedule and execute tasks that were not executed at their original scheduled time.

Backfilling is useful in scenarios where you have a new DAG or task that you want to apply to historical data. It helps ensure that all relevant data is processed and tasks are executed retroactively.

When backfilling, Airflow looks at the specified date range and schedules and executes the tasks for each date within that range. It uses the DAG's schedule interval to determine the frequency at which tasks should be executed.

Backfilling can be triggered using the Airflow command-line interface (CLI) or programmatically through the Airflow API. The CLI command airflow backfill is commonly used to initiate the backfill process.

27

airflow.cfg

ИЛИ

AIRFLOW__CORE__DEFAULT_TASK_RETRIES

retry_delay parameter (default: timedelta(seconds=300))

28 of 52

BACKFILLING

28

@dag(�start_date=datetime(2023, 4, 1),�schedule="@daily",�catchup=False,�default_args={"retries": 3,"retry_delay": duration(seconds=2),"retry_exponential_backoff": True,"max_retry_delay": duration(hours=2),},)

@dag(�start_date=datetime(2023, 4, 1),�schedule="@daily",�catchup=True)

OR

Airflow dags backfill –s 2023-11-01 –e 2023-11-20 dag_name

+API

29 of 52

BRANCH PYTHON OPERATOR

  • X2 xcoms (can disable) do_xcom_push=False (тк минимум две таски будет, бранч и один из его детей)
  • Next tasks can be skipped !!!
  • Add trigger_rule=‘none_failed_or_skipped’ чтобы сработал end

29

branch_operator = BranchPythonOperator( task_id='branch_operator', python_callable=decide_branch, provide_context=True )

def decide_branch(**kwargs):

if some_condition:

return 'branch_a’

return 'branch_b'

start >> branch_operator >> [branch_a, branch_b] >> end�

30 of 52

TRIGGER RUN DAG OPERATOR

(only in airflow 2.0+)

30

from airflow import DAG

from airflow.operators.dagrun_operator import TriggerDagRunOperator

from datetime import datetime

dag = DAG( dag_id='my_trigger_dag', start_date=datetime(2022, 1, 1), schedule_interval=None )

trigger_operator = TriggerDagRunOperator( task_id='trigger_dag_run', trigger_dag_id='my_target_dag', dag=dag )

trigger_operator >> other_tasks

31 of 52

31

from airflow.utils.task_group import TaskGroup

t0 = EmptyOperator(task_id='start')

with TaskGroup(group_id='my_task_group') as tg1:

t1 = EmptyOperator(task_id='task_1')

t2 = EmptyOperator(task_id='task_2')

t1 >> t2

# End task group definition

t3 = EmptyOperator(task_id='end')

# Set task group's (tg1) dependencies

t0 >> tg1 >> t3

TASK GROUPS

32 of 52

32

33 of 52

CUSTOM OPERATOR

33

class MyCustomOperator(BaseOperator):

def __init__(self, my_param, *args, **kwargs):

super().__init__(*args, **kwargs)

self.my_param = my_param

def execute(self, context):

# Implement your operator's logic here

# You can access self.my_param and other attributes as needed

pass

from my_custom_operator import MyCustomOperator

# ...

my_task = MyCustomOperator(

task_id='my_task',

my_param='some_value',

dag=dag

)

Reusability

Abstraction

Testing

34 of 52

Злая колдунья заколдовала мой DAG

Пожалуйста!�Помогите его расколдовать

34

35 of 52

35

36 of 52

36

37 of 52

37

38 of 52

VARIABLES + PARAMS

38

from airflow import models

models.Variable.set("my_variable", "my_value")

from airflow import models

my_variable = models.Variable.get("my_variable")

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

def my_task(**kwargs):

my_parameter = kwargs['dag_run'].conf.get('my_parameter')

# Use the parameter in your code

with DAG('my_dag', ...) as dag:

task = PythonOperator(

task_id='my_task',

python_callable=my_task,

provide_context=True

)

airflow trigger_dag my_dag --conf '{"my_parameter": "my_value"}'

39 of 52

AIRFLOW API

import requests

api_endpoint = 'http://localhost:8080/api/v1’

headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer <YOUR_API_TOKEN>’ }

def trigger_dag(dag_id):

url = f'{api_endpoint}/dags/{dag_id}/dagRuns’

payload = { 'conf': {}, 'execution_date': '2022-01-01T00:00:00+00:00’ }

response = requests.post(url, headers=headers, json=payload)

39

40 of 52

DAG WRITING STYLE (CODE STYLE)

Лучшие практики

40

dag = DAG( 'my_dag', default_args=default_args,

tags=['production', 'data-pipeline’],

description='My Airflow DAG’,

schedule_interval='0 0 * * *')

  1. DAG Structure
  2. Modularity
  3. Version Control
  4. Documentation
  5. Testing
  6. Error Handling and Logging
  7. Code Review

[...] Always code as if the guy who ends up maintaining your code will be a violent psychopath who knows where you live. Code for readability. Nintendo John Woods

41 of 52

41

def _calculate_stats(**context):

""Вычисляет статистику событий.""

input_path = context["templates_dict"]["input_path"] output_path = context["templates_dict"]["output_path"]

events = pd.read_json(input_path)�stats = events.groupby(["date", "user"]).size().reset_index() stats.to_csv(output_path, index=False)

email_stats(stats, email="user@example.com")

Так делать плохо:

АТОМАРНОСТЬ

42 of 52

ИДЕМПОТЕНТНОСТЬ

Еще одно важное свойство, которое следует учитывать при написании задач Airflow, – это идемпотентность. Задачи называются идемпо- тентными, если вызов одной и той же задачи несколько раз с одними и теми же входными данными не имеет дополнительного эффекта. Это означает, что повторный запуск задачи без изменения входных данных не должен изменять общий результат.

42

43 of 52

TASKFLOW API

TaskFlow API - это стиль написания кода с использованием декораторов для определения DAGов и задач, который упрощает процесс передачи данных между задачами и определения зависимостей.

43

def transform(**kwargs):

total_order_value = 0

for value in order_data.values():

total_order_value += value

total_value = {"total_order_value": total_order_value}

@task(multiple_outputs=True)

def transform(order_data_dict: dict):

total_order_value = 0

for value in order_data_dict.values():

total_order_value += value

return {"total_order_value": total_order_value}

С этим декоратором не нужно явно создавать инстанс PythonOperator и присваивать ему python_callable. В качестве task_id у оператора будет название функции, по желанию можно задать task_id в декораторе.

!!!На основе PythonOperator!!!

44 of 52

44

45 of 52

Тест на благонадежность

*рекурсивно смотрим папку с дагами

*проверяем что не зациклен даг

*проверяем что каждый py файл имеет обьект дага

Декоратор запустит тест для каждого py файла

pytest tests/

45

import glob�import importlib.util import os�import pytest�from airflow.models import DAG���DAG_PATH = os.path.join(�os.path.dirname(__file__), "..", "..", "dags/**/*.py"�)�DAG_FILES = glob.glob(DAG_PATH, recursive=True) \��@pytest.mark.parametrize("dag_file", DAG_FILES)�def test_dag_integrity(dag_file):� module_name, _ = os.path.splitext(dag_file)� module_path = os.path.join(DAG_PATH, dag_file)� mod_spec = importlib.util.spec_from_file_location(module_name,� module_path)� module = importlib.util.module_from_spec(mod_spec)� mod_spec.loader.exec_module(module)� dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]� � � assert dag_objects� � for dag in dag_objects:� dag.test_cycle()

46 of 52

46

47 of 52

47

48 of 52

OOZIE(CTL) VS AIRFLOW

Плюсы:

  • Интеграция с экосистемой Hadoop: Oozie тесно интегрирован с различными компонентами Hadoop, такими как HDFS, MapReduce, Hive, Pig и Sqoop, что делает его подходящим для рабочих процессов на основе Hadoop.
  • Надежный и зрелый: Oozie существует уже более длительное время и является зрелым проектом со стабильным набором функций.
  • Поддержка нескольких типов заданий: Oozie поддерживает широкий спектр типов заданий, включая MapReduce, Pig, Hive, Sqoop и Spark.

Минусы

  • Более крутая кривая обучения: Конфигурация Oozie на основе XML и сложная настройка могут усложнить изучение и эффективное использование для новичков.
  • Отсутствие динамического планирования: В Oozie отсутствует встроенная поддержка динамического планирования, что делает его менее гибким для обработки изменяющихся условий или зависимостей.
  • Ограниченные возможности визуализации: Возможности визуализации и мониторинга Oozie относительно ограничены по сравнению с другими системами управления рабочими процессами.

48

  • Плюсы:

  • Рабочие процессы на Python: Airflow позволяет пользователям определять рабочие процессы с помощью Python, что делает его более доступным и гибким для разработчиков.
  • Динамичный и расширяемый: Airflow поддерживает динамическое планирование, позволяя запускать задачи на основе условий или внешних событий. Он также предоставляет богатый набор операторов и сенсоров, что делает его очень расширяемым.
  • Расширенная визуализация и мониторинг: Airflow предоставляет веб-интерфейс для визуализации и мониторинга рабочих процессов, что упрощает отслеживание прогресса и статуса задач.
  • Минусы:
  • Ограниченная интеграция с экосистемой Hadoop: Хотя Airflow может взаимодействовать с компонентами Hadoop, он не обладает таким уровнем интеграции, как Oozie, что может потребовать дополнительной настройки.

49 of 52

AIRFLOW VS LUIGI

AIRFLOW

Вам нужна масштабируемая и распределенная система

Вам требуется гибкая и расширяемая платформа:

Airflow предоставляет широкий спектр встроенных операторов и интеграций с различными системами

Вам нужны расширенные возможности планирования и мониторинга

веб-интерфейс для мониторинга и управления рабочими процессами, обеспечивая видимость статуса задачи, журналов и исторических данных.

49

LUIGI

Простое и легковесное решение:

разработчики с базовыми знаниями Python.

У вас относительно простые рабочие процессы с меньшим количеством зависимостей.

Вы отдаете приоритет удобочитаемости кода и сопровождаемости.

50 of 52

А МОЖНО ЛИ ПЕРЕКИДЫВАТЬ ДАННЫЕ МЕЖДУ ТАСКАМИ БЕЗ XCOM/LOCAL FILES/EXTERNAL STORAGE

50

51 of 52

ПРИ НАПИСАНИИ ЭТОЙ ПРЕЗЕНТАЦИИ НЕ ПОСТРАДАЛ НИ ОДИН ДАГ

51

52 of 52

52