APACHE AIRFLOW
DEEP DIVE 19.12.2023
ЧТО ТАКОЕ AIRFLOW
Apache Airflow - это платформа (созданная airbnb) с открытым исходным кодом, используемая для организации и планирования сложных рабочих процессов и конвейеров данных. Она позволяет определять, планировать и отслеживать задачи в виде направленных ациклических графов (DAG). Airflow предоставляет способ программного создания, планирования и мониторинга рабочих процессов, упрощая управление и автоматизацию конвейеров передачи данных.
2
ТИПИЧНЫЙ USE CASE
3
База данных 1
AIRFLOW DAG 1
Сервис погоды
Сервис BTC to USD
База данных 2
AIRFLOW DAG 2
UI Apache airflow
4
КОНВЕЙЕРЫ ОБРАБОТКИ ДАННЫХ
Часто бывает так что задач становится слишком много, между задачами есть зависимости, в связи с этим для облегчения понимания и управления было принято использовать графы, в частности направленные (ориентированные или же орграфы) ациклические графы.
Т.е. те чьи ребра имеют одно направление (можно так же представить что связь между узлами односторонняя)
Так же нет петель или циклов (ациклический)
(это все позволяет избегать ошибок)
5
6
!!!ВАЖНО!!!
При использовании AIRFLOW вы обычно определяете ресурсы, необходимые для каждой задачи, используя параметры. Эти параметры позволяют вам указать ЦП, память и другие требования к ресурсам для задачи. Тем не менее, важно отметить, что сам AIRFLOW не применяет и не управляет распределением этих ресурсов.
Фактическое распределение ресурсов происходит в среде исполнения. Например, если вы используете Kubernetes в качестве исполнителя, Kubernetes планирует и распределять ресурсы на основе определенных требований каждой задачи. Точно так же, если вы используете YARN/MESOS, MESOS будет обрабатывать распределение ресурсов.
Роль AIRFLOW заключается в том, чтобы организовать рабочий процесс и управлять зависимостями между задачами. Это гарантирует, что задачи выполняются в правильном порядке и обрабатывают планирование задач, повторение и мониторинг. С другой стороны, среда выполнения отвечает за управление ресурсами и выполнение задач на основе выделенных ресурсов.
AIRFLOW напрямую не выделяет ресурсы на задачи. Он опирается на основную среду выполнения для обработки распределения ресурсов на основе определенных требований каждой задачи.
7
DAG
8
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
9
ОПЕРАТОРЫ
10
По сути Оператор - это шаблон предзаполненной таски (задачи) декларативно объявляющейся в ОАГе
11
https://docs.google.com/spreadsheets/d/1d-f4tM4JsM04uu_q2SCLbASN760MqRt5Swta_F6vBOo/edit#gid=0
12
А КАК ИСПОЛЬЗОВАТЬ (ПАЙ)СПАРК???
13
def run_pyspark():
spark = SparkSession.builder \
.appName("MyPySparkJob") \
.config("spark.master", "yarn") \
.config("spark.submit.deployMode", "cluster") \
.config("spark.hadoop.fs.defaultFS", "hdfs://<hdfs-host>:<hdfs-port>") \
.getOrCreate()
# Your PySpark code here
# ...
spark.stop()
dag = DAG(
dag_id='my_pyspark_dag',
schedule_interval='0 0 * * *’,
start_date=datetime(2022, 1, 1),
)
pyspark_task = PythonOperator(
task_id='run_pyspark',
python_callable=run_pyspark,
dag=dag,
)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from pyspark.sql import SparkSession
14
CONNECTIONS & HOOKS
15
Соединение (connection) – это набор параметров (логин, пароль, хост) и некая внешняя система c уникальным именем (conn_id), к которой и происходит подключение. Такой системой может быть базы и хранилища данных, электронная почта, AWS, Google Cloud, Telegram и т.д. Всякий раз, когда вам нужно изнутри Airflow подключиться к БД, то будут использоваться соединения и хуки.
Admin->Connection.
CONNECTIONS & HOOKS
16
Хук (hook) предоставляет интерфейс для взаимодействия с внешней системой в пределах одного графа. Например, некоторые задачи требуют доступа к MySQL, и чтобы не устанавливать связь каждый раз в нужной задаче, можно использовать хук. Хуки также позволяют не хранить параметры аутентификации в графе. По сути своей, хук позволяет использовать возможности SQLOperator внутри PythonOperator.
�
Хук — это SQLOperator, который можно использовать в функциях (их гораздо больше конечно)
sql = """ INSERT INTO example (id, exec_time) values (%d, "%s") ""”
sqlite_hook=SqliteHook(sqlite_conn_id='sqlite_conn’)
sqlite_hook.run(sql % (0, exec_time))
17
Cенсоры
18
Flow задач
ШАБЛОНЫ JINJA
19
BashOperator(�task_id="print_day_of_week",�bash_command="echo Today is {{ execution_date.format('dddd') }}",�)
20
XCOM
21
def _train_model(**context):� model_id = str(uuid.uuid4()) context["task_instance"].xcom_push(key="model_id", value=model_id)
train_model = PythonOperator( t ask_id="train_model", python_callable=_train_model,
)
def _deploy_model(**context):� model_id = context["task_instance"].xcom_pull(
Task_ids="train_model", key="model_id" )
print(f"Deploying model {model_id}")
deploy_model = PythonOperator( task_id="deploy_model", python_callable=_deploy_model,
)
!!Вы несете ответственность за то, чтобы задачи с зависимостями XCom выполнялись в правильном порядке, Airflow не сделает этого за вас. !!
УБОРКА XCOM ЗА СОБОЙ
XCOM так же важно чистить (об этом так же упомянута на слайде Branch Operator)
Можно на уровне кода
Так же можно на уровне UI.
В целом способов много
22
from airflow.models import DagRun
dag_id = 'your_dag_id'
execution_date = '2022-01-01T00:00:00+00:00'
dag_run = DagRun.find(dag_id=dag_id, execution_date=execution_date)
dag_run[0].xcom_clear()
delete_xcom_task = PostgresOperator( task_id='delete-xcom-task', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id=dag.dag_id and task_id='your_task_id' and execution_date={{ ds }}", dag=dag)
JINJA + XCOM
def deploy_model(templates_dict, **context):
model_id = templates_dict["model_id"]
print(f"Deploying model {model_id}")
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(� task_ids='train_model', key='model_id')}}"
23
ЛОГГИРОВАНИЕ
по умолчанию Airflow использует конфигурацию что выводится в консоль. Вы можете изменить настройки ведения логов в файле конфигурации Airflow (airflow.cfg)
logger = logging.getLogger("airflow.task")
By default, Airflow outputs logs to the base_log_folder configured in airflow.cfg, which is located in your $AIRFLOW_HOME directory.
Преимущество использования логгера перед принтами в том, что вы можете вести логи на разных уровнях и контролировать, какие логи отправляются и когда.
Например, по умолчанию используется airflow.регистратор задач установлен на уровне INFO, что означает, что логи на уровне DEBUG не регистрируются. Чтобы просматривать журналы отладки при отладке ваших задач на Python, вам нужно установить AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG или изменить значение logging_level в airflow.cfg. После отладки вы можете изменить logging_level обратно на INFO, не изменяя свой код DAG.
24
ЭФФЕКТ НЕУДАЧ
Если вышестоящие задачи постоянно фейлятся то это распространяется на нижестоящие (логично). Что в свою очередь может повлечь за собой массовый сбой и падения - если правила триггеров определены как all_success, которое требует, чтобы все его зависимости были успешно выполнены.
Это мелочь но, стоит не забывать об этой мелочи чтобы не получать неприятные сюрпризы.
task1 = DummyOperator( task_id='task1', all_success=False, dag=dag )
25
ПЕРЕЗАПУСК ЗАФЕЙЛЕНЫХ ТАСОК
26
BACKFILLING
In Apache Airflow, backfilling refers to the process of running historical or missed tasks for a specific DAG (Directed Acyclic Graph). It allows you to schedule and execute tasks that were not executed at their original scheduled time.
Backfilling is useful in scenarios where you have a new DAG or task that you want to apply to historical data. It helps ensure that all relevant data is processed and tasks are executed retroactively.
When backfilling, Airflow looks at the specified date range and schedules and executes the tasks for each date within that range. It uses the DAG's schedule interval to determine the frequency at which tasks should be executed.
Backfilling can be triggered using the Airflow command-line interface (CLI) or programmatically through the Airflow API. The CLI command airflow backfill is commonly used to initiate the backfill process.
27
airflow.cfg
ИЛИ
AIRFLOW__CORE__DEFAULT_TASK_RETRIES
retry_delay parameter (default: timedelta(seconds=300))
BACKFILLING
28
@dag(�start_date=datetime(2023, 4, 1),�schedule="@daily",�catchup=False,�default_args={�"retries": 3,�"retry_delay": duration(seconds=2),�"retry_exponential_backoff": True,�"max_retry_delay": duration(hours=2),�},�)
@dag(�start_date=datetime(2023, 4, 1),�schedule="@daily",�catchup=True�)
OR
Airflow dags backfill –s 2023-11-01 –e 2023-11-20 dag_name
+API
BRANCH PYTHON OPERATOR
29
branch_operator = BranchPythonOperator( task_id='branch_operator', python_callable=decide_branch, provide_context=True )
def decide_branch(**kwargs):
if some_condition:
return 'branch_a’
return 'branch_b'
start >> branch_operator >> [branch_a, branch_b] >> end�
TRIGGER RUN DAG OPERATOR
(only in airflow 2.0+)
30
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
dag = DAG( dag_id='my_trigger_dag', start_date=datetime(2022, 1, 1), schedule_interval=None )
trigger_operator = TriggerDagRunOperator( task_id='trigger_dag_run', trigger_dag_id='my_target_dag', dag=dag )
trigger_operator >> other_tasks
31
from airflow.utils.task_group import TaskGroup
t0 = EmptyOperator(task_id='start')
with TaskGroup(group_id='my_task_group') as tg1:
t1 = EmptyOperator(task_id='task_1')
t2 = EmptyOperator(task_id='task_2')
t1 >> t2
# End task group definition
t3 = EmptyOperator(task_id='end')
# Set task group's (tg1) dependencies
t0 >> tg1 >> t3
TASK GROUPS
32
CUSTOM OPERATOR
33
class MyCustomOperator(BaseOperator):
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
# Implement your operator's logic here
# You can access self.my_param and other attributes as needed
pass
from my_custom_operator import MyCustomOperator
# ...
my_task = MyCustomOperator(
task_id='my_task',
my_param='some_value',
dag=dag
)
Reusability
Abstraction
Testing
Злая колдунья заколдовала мой DAG
Пожалуйста!�Помогите его расколдовать
34
35
36
37
VARIABLES + PARAMS
38
from airflow import models
models.Variable.set("my_variable", "my_value")
from airflow import models
my_variable = models.Variable.get("my_variable")
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_task(**kwargs):
my_parameter = kwargs['dag_run'].conf.get('my_parameter')
# Use the parameter in your code
with DAG('my_dag', ...) as dag:
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
provide_context=True
)
airflow trigger_dag my_dag --conf '{"my_parameter": "my_value"}'
AIRFLOW API
import requests
api_endpoint = 'http://localhost:8080/api/v1’
headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer <YOUR_API_TOKEN>’ }
def trigger_dag(dag_id):
url = f'{api_endpoint}/dags/{dag_id}/dagRuns’
payload = { 'conf': {}, 'execution_date': '2022-01-01T00:00:00+00:00’ }
response = requests.post(url, headers=headers, json=payload)
39
DAG WRITING STYLE (CODE STYLE)
Лучшие практики
40
dag = DAG( 'my_dag', default_args=default_args,
tags=['production', 'data-pipeline’],
description='My Airflow DAG’,
schedule_interval='0 0 * * *')
[...] Always code as if the guy who ends up maintaining your code will be a violent psychopath who knows where you live. Code for readability. Nintendo John Woods
41
def _calculate_stats(**context):
""Вычисляет статистику событий.""
input_path = context["templates_dict"]["input_path"] output_path = context["templates_dict"]["output_path"]
events = pd.read_json(input_path)�stats = events.groupby(["date", "user"]).size().reset_index() stats.to_csv(output_path, index=False)
email_stats(stats, email="user@example.com")
Так делать плохо:
АТОМАРНОСТЬ
ИДЕМПОТЕНТНОСТЬ
Еще одно важное свойство, которое следует учитывать при написании задач Airflow, – это идемпотентность. Задачи называются идемпо- тентными, если вызов одной и той же задачи несколько раз с одними и теми же входными данными не имеет дополнительного эффекта. Это означает, что повторный запуск задачи без изменения входных данных не должен изменять общий результат.
42
TASKFLOW API
TaskFlow API - это стиль написания кода с использованием декораторов для определения DAGов и задач, который упрощает процесс передачи данных между задачами и определения зависимостей.
43
def transform(**kwargs):
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
С этим декоратором не нужно явно создавать инстанс PythonOperator и присваивать ему python_callable. В качестве task_id у оператора будет название функции, по желанию можно задать task_id в декораторе.
!!!На основе PythonOperator!!!
44
Тест на благонадежность
*рекурсивно смотрим папку с дагами
*проверяем что не зациклен даг
*проверяем что каждый py файл имеет обьект дага
Декоратор запустит тест для каждого py файла
pytest tests/
45
import glob�import importlib.util import os�import pytest�from airflow.models import DAG���DAG_PATH = os.path.join(�os.path.dirname(__file__), "..", "..", "dags/**/*.py"�)�DAG_FILES = glob.glob(DAG_PATH, recursive=True) \��@pytest.mark.parametrize("dag_file", DAG_FILES)�def test_dag_integrity(dag_file):� module_name, _ = os.path.splitext(dag_file)� module_path = os.path.join(DAG_PATH, dag_file)� mod_spec = importlib.util.spec_from_file_location(module_name,� module_path)� module = importlib.util.module_from_spec(mod_spec)� mod_spec.loader.exec_module(module)� dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]� � � assert dag_objects� � for dag in dag_objects:� dag.test_cycle()
46
47
OOZIE(CTL) VS AIRFLOW
Плюсы:
Минусы
48
AIRFLOW VS LUIGI
AIRFLOW
Вам нужна масштабируемая и распределенная система
Вам требуется гибкая и расширяемая платформа:
Airflow предоставляет широкий спектр встроенных операторов и интеграций с различными системами
Вам нужны расширенные возможности планирования и мониторинга
веб-интерфейс для мониторинга и управления рабочими процессами, обеспечивая видимость статуса задачи, журналов и исторических данных.
49
LUIGI
Простое и легковесное решение:
разработчики с базовыми знаниями Python.
У вас относительно простые рабочие процессы с меньшим количеством зависимостей.
Вы отдаете приоритет удобочитаемости кода и сопровождаемости.
А МОЖНО ЛИ ПЕРЕКИДЫВАТЬ ДАННЫЕ МЕЖДУ ТАСКАМИ БЕЗ XCOM/LOCAL FILES/EXTERNAL STORAGE
50
ПРИ НАПИСАНИИ ЭТОЙ ПРЕЗЕНТАЦИИ НЕ ПОСТРАДАЛ НИ ОДИН ДАГ
51
52