Ray and Flyte: Distributed Computing and Orchestration
K8s-native ML Workflows meetup 2022
Eduardo Apolinario, OSS Lead @ Union.ai
09/21/2022
Agenda
What is Flyte?
Flyte is a data- and machine learning-aware orchestration platform that enables highly concurrent, scalable, and reproducible workflows for data processing and machine learning.
The 5 Tenets of the Flyte Programming Model
✅ 🏛 Reliability: When data passing through my pipelines violates certain assumptions I’ve specified, I should know about it ASAP.
🔄 🔄 Reproducibility: When I run my pipeline or parts of it, I can expect consistent behavior, provided that I’ve given it the same inputs.
❌ 🔂 Recoverability: When my pipeline fails, I should be able to resume execution from the failure point after I fix the underlying issue, without wasting compute.
🔎 👀 Auditability: During and after a pipeline run, I should be able to inspect all of the inputs and outputs of the nodes of my execution graph.
Scalability: I should be able to run my pipeline at scale without any modification
Overview
Kubernetes Cluster
Flyte Cluster
Compiled Workflow
Config
⚙️
🔀 Create Local Workflows
Workflow Execution
K8s Pod
Docker Container
🐳
Serialize & Register
Building blocks: Tasks & Workflows
Write Your Code
@task(cache=True, cache_version=”1.0”)
def pay_multiplier(df: pandas.DataFrame, scalar: int) -> pandas.DataFrame:
df["col"] = 2 * df["col"]
return df
@task
def total_spend(df: pyspark.DataFrame) -> int:
return df.agg(F.sum("col")).collect()[0][0]
@workflow
def calculate_spend(emp_df: pandas.DataFrame) -> int:
return total_spend(df=pay_multiplier(df=emp_df, scalar=2))
# Execute
pay_multiplier(df=pandas.DataFrame())
calculate_spend(emp_df=pandas.DataFrame())
Get Ready to Scale!
@task(limits=Resources(cpu="2", mem="150Mi"))
def pay_multiplier(df: pandas.DataFrame, scalar: int) -> pandas.DataFrame:
df["col"] = 2 * df["col"]
return df
@task(task_config=Spark(
spark_conf={"spark.driver.memory": "1000M"}
), retries=2)
def total_spend(df: pyspark.DataFrame) -> int:
return df.agg(F.sum("col")).collect()[0][0]
@workflow
def calculate_spend(emp_df: pandas.DataFrame) -> int:
return total_spend(df=pay_multiplier(df=emp_df, scalar=2))
LaunchPlan.get_or_create(name="...",
workflow=calculate_spend,
schedule=FixedRate(duration=timedelta(minutes=10)),
notifications=[
Email(
phases=[WorkflowExecutionPhase.FAILED],
recipients_email=[...])]),
)
Execute and interact
# Fetch & Create Execution
flytectl get tasks -p proj -d dev core.advanced.run_merge_sort.merge --version v2 --execFile execution_spec.yaml
flytectl create execution --execFile execution_spec.yaml -p proj -d dev --targetProject proj
# Fetch results
flytectl get execution -p proj -d dev oeh94k9r2r --details -o yaml
# Visualize workflow
flytectl get workflow -p prj -d dev core.flyte_basics.basic_workflow.my_wf --latest -o doturl
Golang CLI
Python-based CLI
# Register and execute a single workflow/task
pyflyte run –remote flyte/workflows/example.py
# Register all Flyte entities under a directory
pyflyte register --image <docker_img> flyte_basics
UX Overview - rendered graphs
UX Overview - error traces
Kubernetes-native Fabric that connects the best of breed ML technologies.
Ray Task Usage
Regular Ray Code
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params="true"),
worker_node_config=[WorkerNodeConfig(group_name="test-group", replicas=10)])),
runtime_env={"pip": ["numpy", "pandas"]},
)
@task(task_config=ray_config)
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(5)]
return ray.get(futures)
@workflow
def wf() -> typing.List[int]:
return ray_task()
@ray.remote
def f(x):
return x * x
Ray Cluster config
Regular Flyte code (python)
Ray Task Architecture
Roadmap
Questions?
Resources