1 of 38

Go Cross-Language IO Quickstart

danoliveira@

2 of 38

Cross-Language Overview

3 of 38

Portable Pipeline (Single Language)

Pipeline Construction

Workers

Runner

Go Pipeline Code

Job Service

Go SDK Harness

4 of 38

Portable Pipeline (Multi Language)

Pipeline Construction

Workers

Runner

Go Pipeline Code

Job Service

Go SDK Harness

Java Expansion Service

Java SDK Harness

5 of 38

Expansion Service

  • How will a Go pipeline be able to create a proto representation of Java transforms?
    • What is “expansion”?
    • Java & Python SDK: expand() method
    • Take top-level (composite) transform and expand into component parts
    • Overloaded to mean expanding into proto representation
  • Input: URN and Configuration Payload
  • Output: Expanded proto, ready for Job Server
  • Local to pipeline construction
    • I.e. on user’s machine
    • (Mostly) runner-agnostic

6 of 38

Artifacts

  • What is an “artifact”?
    • Dependency needed by SDK harness
  • Expansion service provides artifacts via protos
  • Native SDK stages artifacts

Pipeline Code

Job Service

SDK Harness

Artifact Staging Service

Stage

Retrieve

7 of 38

Cross-Language SDK Harnesses

  • It just works! Super easy
  • Done by Job Server
  • Model does not differentiate between native and xlang
  • Environment protos
  • Only limitation: Docker only

8 of 38

Codebase

9 of 38

XLang Entrypoints

User adds xlang transform to pipeline. How is it represented?

  • User-facing IO wrapper: Such as examples/xlang/transforms.go
    • IO-specific API with xlang implementation. Recommended user-facing.
  • Top-level entrypoint: beam/xlang.go
    • beam.CrossLanguage - Generic entrypoint into xlang. User-facing but not recommended.
    • Accepts config info: URN, payload, inputs, outputs.
    • Two important tasks:
      • Go representation (MultiEdge)
      • Expand with expansion service

10 of 38

Go Representation

  • MultiEdge: core/graph/edge.go
    • MultiEdge.Op = External
    • MultiEdge.External = new ExternalTransform{...}
  • NOT graph.NewExternal - Deprecated legacy impl.
  • YES graph.NewCrossLanguage
  • Fills out Go representation only
  • Missing expanded proto
  • I.e. “black box”, filled after expansion

11 of 38

Expansion

core/runtime/xlangx/expand.go

  1. Convert just the xlang MultiEdge to proto
  2. Fix it a bit to make it valid for expansion service
  3. Namespace proto names
  4. Query expansion service (xlangx/namespace.go)
  5. Stick expanded protos into MultiEdge

12 of 38

Native Artifacts

First, how do artifacts work in native pipelines?

  • Only one artifact used, worker binary.
  • Flag: worker_binary
  • Used in runnerlib/execute.go and dataflow/execute.go
    • Default to current binary first.
    • If not compatible, cross-compile a temp worker binary.
  • Staged via Portable API runnerlib/stage.go
  • Dataflow stages to GCS dataflowlib/stage.go
  • Both files above also handle xlang staging

13 of 38

XLang Artifacts: Overview

XLang artifacts work as follows:

  1. Resolve XLang dependencies
    • “What artifacts do we need?”
    • “Where can we find the artifacts?”
  2. Runner stages dependencies
    • Send dependencies where runner expects them
    • Both native and XLang dependencies

14 of 38

XLang Artifacts: Resolving

  • core/runtime/xlangx/resolve.go
  • Entrypoint: xlangx.ResolveArtifacts
  • For each xlang edge:
    • Retrieve dependencies from expansion service
    • Retrieve each dependency as an artifact
    • Stage each artifact to default local filepath
    • Adds dependencies to edge’s environment proto
  • Low-level functionality in artifact package: beam/artifact
    • Technically generic, but currently only used in xlang

15 of 38

XLang Artifacts: Portable

  • Portable API runners (example: universal.go:83)
  • Call xlangx.ResolveArtifacts directly
  • Stage via Portable API in runnerlib/stage.go
    • URN: "beam:artifact:type:file:v1"

16 of 38

XLang Artifacts: Dataflow

  • Happens in Execute in runners/dataflow/dataflow.go
  • Call dataflowlib.ResolveXLangArtifacts
  • Resolves and stages XLang artifacts
    • Does not stage native artifacts here, that’s elsewhere

17 of 38

Proto Pipeline

  • Native pipeline -> Proto pipeline
  • External edges have saved protos
  • Extract those and stick them into main proto pipeline
  • graphx.Marshal in core/runtime/graphx/translate.go
    • Perform extra steps if there are xlang transforms
  • Extra steps in core/runtime/graphx/xlang.go
  • Basically:
    • Merge all proto info into main proto’s Components
    • Hook up inputs and outputs properly

18 of 38

Configuration Details

  • sdk_harness_container_image_overrides
    • Job option to override container image names in XLang
    • Mainly for staging or custom docker containers
    • Ex. “apache/beam_java11_sdk:latest” -> “us.gcr.io/myproject/danoliveira/beam_java11_sdk:timestamp”

--sdk_harness_container_image_override= \�".*java.*,us.gcr.io/myproject/danoliveira/beam_java11_sdk:timestamp"

19 of 38

Dataflow-Specific Details

  • XLang Requires:
    • Unified Worker
    • Portable Job Submission
  • Both requirements currently enabled by default.

20 of 38

XLang IO Integration Tests

  • In go/test/…, with tasks defined in the build.gradle
  • Two different kinds
  • Basic XLang correctness tests
    • Use TestExpansionService
    • Part of basic ValidatesRunner test suites
  • XLang IO tests
    • Use IOExpansionService
    • More heavy duty
    • Will likely require modifying goIoValidatesRunnerTask
    • Will likely also require additional services to test IOs
    • Services can be started up programmatically, or via gradle

21 of 38

Dev Workflows

22 of 38

Start an Expansion Service Manually

Testing:

IOs:

$ ./gradlew :sdks:java:testing:expansion-service:runTestExpansionService \

-PconstructionService.port=8097

$ ./gradlew :sdks:java:io:expansion-service:runExpansionService \

-PconstructionService.port=8097

23 of 38

Running XLang Integration Test Suites

$ ./gradlew :sdks:go:test:dataflowValidatesRunner

$ ./gradlew :sdks:go:test:flinkValidatesRunner

$ ./gradlew :sdks:go:test:ulrValidatesRunner

… etc.

24 of 38

Building Containers

$ ./gradlew :sdks:go:container:docker -Pdocker-tag=2.35.0.dev

$ ./gradlew :sdks:java:container:java11:docker

$ ./gradlew :sdks:java:container:java8:docker

$ ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=us.gcr.io/myproject/danoliveira -Pdocker-tag=testing�$ gcloud docker -- push us.gcr.io/myproject/danoliveira/beam_java11_sdk:testing

… etc.

25 of 38

Running a Specific XLang Integration Test

Easy setup:

  • Go into sdks/go/test/integration/integration.go
  • Filter out all the tests you don’t want

Harder setup:

26 of 38

Dataflow Configuration

  • Native Go and Xlang containers must be staged to GCS
  • Easy workaround by editing run_validatesrunner_tests.sh

--sdk_harness_container_image_override= \�".*java.*,us.gcr.io/myproject/danoliveira/beam_java11_sdk:timestamp"

27 of 38

Running a Custom XLang Pipeline

  • Similar to running integration tests with go test
  • Write full pipeline from perspective of user
  • Easiest approach: Write one in “examples” directory
  • Remember to build docker containers

28 of 38

Example: Wordcount on Portable Runners

$ cd sdks/go/examples/xlang/wordcount

$ ./wordcount --runner=universal --endpoint=localhost:8099 --expansion_addr=localhost:8095

29 of 38

Example: Pipeline on Dataflow

This is an example of a pipeline I’ve been testing recently:

$ ./bigquery --output=gs://danoliveira/sandbox/out.txt --runner=dataflow \�--expansion_addr=localhost:8095 --project=myproject \�--region=us-central1 --environment_type=DOCKER \�--staging_location=gs://danoliveira/sandbox/bq-test-storage/staging \�--temp_location=gs://danoliveira/sandbox/bq-test-storage/temp \�--environment_config=us.gcr.io/myproject/danoliveira/beam_go_sdk:bqtest \�--sdk_harness_container_image_override=".*java.*,us.gcr.io/myproject/danoliveira/beam_java11_sdk:bqtest"

30 of 38

Debugging

31 of 38

A Warning

  • Few explicit instructions in this section
  • Hard to find patterns in specific error messages
  • Use this as a rough guide of where problems might be occurring
  • Worth getting help on mailing list if stuck.

32 of 38

Expansion Errors

  • Most obvious
  • ExpansionResponse will contain error
  • Go SDK will output ExpansionResponse error
  • If necessary, failure point is in expand.go
  • Also, expansion service will usually output logs
  • Worst-case scenario: stepping through expansion service
    • Right-click runExpansionService in build.gradle in IntelliJ
    • Click “Debug runExpansionService
  • Usual causes:
    • Incorrect xlang payload
    • Incorrect inputs/outputs
    • Incorrect expansion service used

33 of 38

Stepping Through Expansion Service

34 of 38

Proto Errors

Incorrect proto submitted to Job Server

  • Error normally occurs execution-time
  • Often some failure on Job setup
  • Usual culprits:
    • Environment configs
    • Dependencies (i.e. artifacts)
    • Incorrect inputs/outputs on xlang transform
  • Fixes involve:
    • Pipeline -> proto translation code (graphx.Marshal)
    • Reading through a lot of proto output

35 of 38

SDK Harness Errors

Errors occurring in execution time, but in SDK Harness

  • Usually require reading through docker logs
  • Find stopped container: $ docker container ls -a
  • Read logs from container: $ docker logs [container name/id]
  • Container auto-deleted? Use --retain_docker_containers
    • Keeps docker containers around so logs can be read.

36 of 38

Artifact Errors

Errors occurring due to incorrect artifact resolution or staging

  • Manifests either in:
    • Artifact staging/resolution code (pipeline construction)
    • Incorrect dependencies in proto pipeline
    • SDK harness error due to missing dependency (ex. “class not found”)
  • Solution often involves artifact retrieval/staging code
  • This one’s tough, don’t hesitate to get help

37 of 38

Step-Through Debugging

Annoying to set up, but almost always worth it

  • Any of these binaries work with IntelliJ debugger:
    • Pipeline Construction
    • Job Server
    • Expansion Service
  • SDK Harness does not work (as far as I know)
  • How to?
    • For Java: Debug gradle target through IntelliJ
    • Otherwise: Manually set up IntelliJ task

38 of 38

IntelliJ Debug Integration Test Example