feat: file-based runtime dispatch for injected tasks#266
Draft
lbakshi wants to merge 1 commit intoNike-Inc:mainfrom
Draft
feat: file-based runtime dispatch for injected tasks#266lbakshi wants to merge 1 commit intoNike-Inc:mainfrom
lbakshi wants to merge 1 commit intoNike-Inc:mainfrom
Conversation
Author
|
At deploy time, rendered template code is written to
_brickflow_injected/<task>.py in the project directory. DAB syncs these
files to the workspace via a sync.include directive (overrides .gitignore).
A lightweight JSON pointer (task_name, task_type, file_ref) is stored in
brickflow_internal_injection_config, and the fully-resolved workspace
path goes into brickflow_internal_injection_file_path — DAB substitutes
${workspace.file_path} at deploy time.
At runtime the cluster reads the code from the workspace file, falling
back to inline rendered_code for backward compatibility.
This replaces the previous approach of stuffing the full rendered source
into base_parameters, which was fragile and size-limited.
Also includes a BRICKFLOW_RUNTIME_GIT_REF env-var override so the
cluster can install brickflow from a fork/branch during e2e testing
(remove before final merge).
Made-with: Cursor
c959092 to
1eb522f
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Injected tasks deployed via
BRICKFLOW_INJECT_TASKS_DIRfail at runtime because the injection YAML/env vars only exist on the CI agent, not on the Databricks cluster. This PR writes the rendered template code to workspace files that the cluster can read at runtime.Problem
When Brickflow deploys a job with injected tasks, the job definition on Databricks correctly includes those tasks. But when the cluster re-executes the entrypoint notebook,
BRICKFLOW_INJECT_TASKS_DIRis not set, so_inject_tasks_from_yaml()is a no-op andworkflow.get_task(task_id)raisesTaskNotFoundError.Solution
Deploy time
serialize_for_runtime()renders the Jinja template and writes the output to_brickflow_injected/<task_name>.pyin the project directorytask_name,task_type,file_ref) is stored inbrickflow_internal_injection_config— no inline source codebrickflow_internal_injection_file_pathusing DAB's${workspace.file_path}interpolationsynth()addssync.include: ["_brickflow_injected/**"]to the bundle config so DAB uploads the files even when the directory is gitignoredRuntime
workflow.get_task(t_id)fails,_reconstruct_injected_task()readsbrickflow_internal_injection_file_pathfrom the task parameters.pyfile from the workspace filesystem (/Workspace/Users/.../.brickflow_bundles/.../files/_brickflow_injected/<task>.py)rendered_codein the JSON for backward compatibility with older deploymentsTaskobject and executes normallyWhy files instead of base_parameters?
The previous approach serialized the full rendered source into
base_parametersas a JSON string. This was fragile:With workspace files, the rendered code is inspectable in the Databricks UI, has no size limit, and the deploy→runtime contract is a simple file path.
Files changed
brickflow/context/context.pyinjection_configandinjection_file_pathtoBrickflowInternalVariablesbrickflow/engine/task_executor.pyserialize_for_runtime(); read from workspace file increate_task_function_from_config()brickflow/engine/task.pyinjection_config_jsonfield; emit${workspace.file_path}parameterbrickflow/engine/workflow.pyinjection_config_jsonthrough_add_taskbrickflow/engine/project.pybrickflow/codegen/databricks_bundle.pySync(include=["_brickflow_injected/**"])when injected files existtests/engine/test_task_injection.pyTest plan
bundle.ymlcontains lightweight JSON pointer (no inline code)bundle.ymlcontainssync.includefor_brickflow_injected/Pre-review cleanup
BRICKFLOW_RUNTIME_GIT_REFoverride fromget_brickflow_libraries()intask.py— temporary hack to point the cluster at this fork for e2e testingCloses nike-cdea-privacy/sidecar-test-zone-tmp#8