1 of 17

Ray and Flyte: Distributed Computing and Orchestration

K8s-native ML Workflows meetup 2022

Eduardo Apolinario, OSS Lead @ Union.ai

09/21/2022

2 of 17

Agenda

  • What is Flyte?
    • Flyte Programming Model
  • Ray+Flyte integration
  • Q&A

3 of 17

What is Flyte?

Flyte is a data- and machine learning-aware orchestration platform that enables highly concurrent, scalable, and reproducible workflows for data processing and machine learning.

4 of 17

The 5 Tenets of the Flyte Programming Model

✅ 🏛 Reliability: When data passing through my pipelines violates certain assumptions I’ve specified, I should know about it ASAP.

🔄 🔄 Reproducibility: When I run my pipeline or parts of it, I can expect consistent behavior, provided that I’ve given it the same inputs.

❌ 🔂 Recoverability: When my pipeline fails, I should be able to resume execution from the failure point after I fix the underlying issue, without wasting compute.

🔎 👀 Auditability: During and after a pipeline run, I should be able to inspect all of the inputs and outputs of the nodes of my execution graph.

Scalability: I should be able to run my pipeline at scale without any modification

5 of 17

Overview

Kubernetes Cluster

Flyte Cluster

Compiled Workflow

Config

⚙️

🔀 Create Local Workflows

Workflow Execution

K8s Pod

Docker Container

🐳

Serialize & Register

6 of 17

Building blocks: Tasks & Workflows

  • Versioned
  • Strongly typed interfaces
  • Models the flow of data
  • Tasks
    • Arbitrarily complex
    • Encapsulate user code
  • Workflows
    • Composable
    • Dynamic
    • DSL in python (& Java)
      • Javascript SDK contributed by the community

7 of 17

Write Your Code

  • Write code in Python with type annotations
  • Use DataFrames, PySpark, Python data types, etc.
  • Annotate with @task / @workflow
  • Optional: Enable caching
    • Also works locally
  • Execute locally

@task(cache=True, cache_version=1.0)

def pay_multiplier(df: pandas.DataFrame, scalar: int) -> pandas.DataFrame:

df["col"] = 2 * df["col"]

return df

@task

def total_spend(df: pyspark.DataFrame) -> int:

return df.agg(F.sum("col")).collect()[0][0]

@workflow

def calculate_spend(emp_df: pandas.DataFrame) -> int:

return total_spend(df=pay_multiplier(df=emp_df, scalar=2))

# Execute

pay_multiplier(df=pandas.DataFrame())

calculate_spend(emp_df=pandas.DataFrame())

8 of 17

Get Ready to Scale!

  • Specify resources for tasks
  • Specify Spark cluster configuration
  • Add resiliency — retries
  • Optional: Configure one or more Schedule / Notification

@task(limits=Resources(cpu="2", mem="150Mi"))

def pay_multiplier(df: pandas.DataFrame, scalar: int) -> pandas.DataFrame:

df["col"] = 2 * df["col"]

return df

@task(task_config=Spark(

spark_conf={"spark.driver.memory": "1000M"}

), retries=2)

def total_spend(df: pyspark.DataFrame) -> int:

return df.agg(F.sum("col")).collect()[0][0]

@workflow

def calculate_spend(emp_df: pandas.DataFrame) -> int:

return total_spend(df=pay_multiplier(df=emp_df, scalar=2))

LaunchPlan.get_or_create(name="...",

workflow=calculate_spend,

schedule=FixedRate(duration=timedelta(minutes=10)),

notifications=[

Email(

phases=[WorkflowExecutionPhase.FAILED],

recipients_email=[...])]),

)

9 of 17

Execute and interact

# Fetch & Create Execution

flytectl get tasks -p proj -d dev core.advanced.run_merge_sort.merge --version v2 --execFile execution_spec.yaml

flytectl create execution --execFile execution_spec.yaml -p proj -d dev --targetProject proj

# Fetch results

flytectl get execution -p proj -d dev oeh94k9r2r --details -o yaml

# Visualize workflow

flytectl get workflow -p prj -d dev core.flyte_basics.basic_workflow.my_wf --latest -o doturl

Golang CLI

Python-based CLI

# Register and execute a single workflow/task

pyflyte run –remote flyte/workflows/example.py

# Register all Flyte entities under a directory

pyflyte register --image <docker_img> flyte_basics

10 of 17

UX Overview - rendered graphs

11 of 17

UX Overview - error traces

12 of 17

Kubernetes-native Fabric that connects the best of breed ML technologies.

13 of 17

Ray Task Usage

Regular Ray Code

ray_config = RayJobConfig(

head_node_config=HeadNodeConfig(ray_start_params="true"),

worker_node_config=[WorkerNodeConfig(group_name="test-group", replicas=10)])),

runtime_env={"pip": ["numpy", "pandas"]},

)

@task(task_config=ray_config)

def ray_task() -> typing.List[int]:

futures = [f.remote(i) for i in range(5)]

return ray.get(futures)

@workflow

def wf() -> typing.List[int]:

return ray_task()

@ray.remote

def f(x):

return x * x

Ray Cluster config

Regular Flyte code (python)

14 of 17

Ray Task Architecture

15 of 17

Roadmap

  • Coming in the Flyte v1.2 release (end of Sep. 2022)
  • Next releases
    • Metrics/Dashboard
    • Intelligent Ray cluster re-use
    • Looking for feedback on Phase II

16 of 17

Questions?

17 of 17

Resources