Skip to content

Commit c80703a

Browse files
committed
start prototyping a much much much simpler Each() implementation where
we use TRB mechanics, only, except for `Each#call`.
1 parent 2131c12 commit c80703a

2 files changed

Lines changed: 104 additions & 28 deletions

File tree

lib/trailblazer/macro/each.rb

Lines changed: 96 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,78 @@
11
module Trailblazer
22
module Macro
3+
# TODO: explain termini routing, Inject usage for "block activity", Out() => []
4+
5+
def self.Each(block_activity=nil, dataset_from: nil, item_key: :item, id: Macro.id_for(block_activity, macro: :Each, hint: dataset_from), collect: false, **dsl_options_for_iterated, &block)
6+
# TODO: 2.5. fix
7+
iterated_activity, outputs_from_block_activity = Trailblazer::Macro.block_activity_for(block_activity, &block)
8+
iterated_activity.extend(Trailblazer::Macro::Each::Transitive)
9+
10+
# filter to set ctx[:index]
11+
# The interesting part here is that we read dynamic values from the {circuit_options}, to not
12+
# pollute the business ctx.
13+
my_lowlevel_inject_filter = ->((ctx, flow_options), index:, **circuit_options) { index }
14+
my_filter_builder = ->(*) { Trailblazer::Activity::DSL::Linear::VariableMapping::SetVariable.new(name: "bla.FIXME", filter: my_lowlevel_inject_filter, write_name: :index, user_filter: nil) }
15+
# filter to set ctx[item_key]
16+
my_lowlevel_inject_filter_item = ->((ctx, flow_options), item:, **circuit_options) { item }
17+
my_filter_builder_item = ->(*) { Trailblazer::Activity::DSL::Linear::VariableMapping::SetVariable.new(name: "bla.FIXME.item_key", filter: my_lowlevel_inject_filter_item, write_name: item_key, user_filter: nil) }
18+
19+
# DISCUSS: move to Wrap.
20+
# TODO: if a patched step in the iterated activity would add another teminus, this would be inconsistent.
21+
# we'd have to recompute this via `inherited`.
22+
termini_from_block_activity =
23+
outputs_from_block_activity.
24+
# DISCUSS: End.success needs to be the last here, so it's directly behind {Start.default}.
25+
sort { |a,b| a.semantic == :success ? 1 : -1 }.
26+
collect { |output|
27+
[output.signal, id: "End.#{output.semantic}", magnetic_to: output.semantic, append_to: "Start.default"]
28+
}
29+
30+
each_activity = Trailblazer::Activity::Railway(termini: termini_from_block_activity) do
31+
32+
step Subprocess(iterated_activity, strict: true),
33+
id: "ITERATED FIXME",
34+
Inject(:index, filter_builder: my_filter_builder) => my_lowlevel_inject_filter,
35+
Inject(:item, filter_builder: my_filter_builder_item) => my_lowlevel_inject_filter_item,
36+
Out() => [], # per default, don't let anything out.
37+
**Each.options_for_collect(collect: collect)
38+
end
39+
40+
each_activity.class_eval do
41+
def self.call((ctx, flow_options), runner:, **circuit_options)
42+
# We don't really need to override/replace {circuit} as we only want to change the way it's run.
43+
iterated_railway = to_h[:circuit].to_h[:map].keys[1] # DISCUSS: maybe find by id?
44+
45+
dataset = ctx.fetch(:dataset)
46+
signal = nil
47+
48+
dataset.each_with_index do |item, index|
49+
50+
each_options_for_iterated = {
51+
index: index,
52+
item: item,
53+
}
54+
55+
# we "inject" item_key and index via Runner.(..., item_key => ..) and then the input filter grabs that.
56+
signal, (ctx, flow_options) = runner.(iterated_railway, [ctx, flow_options], runner: runner, **circuit_options, activity: self, **each_options_for_iterated)
57+
end
58+
59+
return signal, [ctx, flow_options]
60+
end
61+
end
62+
63+
options_for_dataset_from = Each.options_for_dataset_from(dataset_from: dataset_from)
64+
65+
{
66+
**Trailblazer::Activity::Railway.Subprocess(each_activity),
67+
id: id,
68+
**options_for_dataset_from,
69+
}
70+
end
71+
72+
73+
374
class Each < Macro::Strategy
4-
# FIXME: for Strategy that wants to pass-through the exec_context, so it
75+
# FIXME: for Strategy that wants to pass-through the {:exec_context}, so it
576
# looks "invisible" for steps.
677
module Transitive
778
def call(args, exec_context:, **circuit_options)
@@ -10,7 +81,29 @@ def call(args, exec_context:, **circuit_options)
1081
end
1182
end
1283

13-
def self.call((ctx, flow_options), runner:, **circuit_options) # DISCUSS: do we need {start_task}?
84+
# DSL options added to {block_activity} to implement {collect: true}.
85+
def self.options_for_collect(collect:)
86+
return {} unless collect
87+
88+
{
89+
Activity::Railway.Inject(:collected_from_each) => ->(ctx, **) { [] }, # this is called only once.
90+
Activity::Railway.Out() => ->(ctx, collected_from_each:, **) { {collected_from_each: collected_from_each += [ctx[:value]] } }
91+
}
92+
end
93+
94+
def self.options_for_dataset_from(dataset_from:)
95+
return {} unless dataset_from
96+
97+
{
98+
Activity::Railway.Inject(:dataset, override: true) => dataset_from, # {ctx[:dataset]} is private to {each_activity}.
99+
}
100+
end
101+
102+
103+
104+
105+
106+
def self.call__FIXME((ctx, flow_options), runner:, **circuit_options) # DISCUSS: do we need {start_task}?
14107
dataset = ctx.fetch(:dataset)
15108
signal = @state.get(:success_signal)
16109
item_key = @state.get(:item_key)
@@ -83,7 +176,7 @@ def self.compute_runtime_id(ctx, trace_node:, activity:, compile_id:, **)
83176
end
84177

85178
# @api private The internals here are considered private and might change in the near future.
86-
def self.Each(block_activity=nil, dataset_from: nil, item_key: :item, id: Macro.id_for(block_activity, macro: :Each, hint: dataset_from), collect: false, **dsl_options_for_iterated, &block)
179+
def self.Each__FIXME(block_activity=nil, dataset_from: nil, item_key: :item, id: Macro.id_for(block_activity, macro: :Each, hint: dataset_from), collect: false, **dsl_options_for_iterated, &block)
87180
dsl_options_for_iterated = block_activity if block_activity.is_a?(Hash) # Ruby 2.5 and 2.6
88181

89182
block_activity, outputs_from_block_activity = Macro.block_activity_for(block_activity, &block)
@@ -169,23 +262,7 @@ def self.task_wrap_for_iterated(dsl_options)
169262
activity.to_h[:config][:wrap_static]["iterated"]
170263
end
171264

172-
# DSL options added to {block_activity} to implement {collect: true}.
173-
def self.options_for_collect(collect:)
174-
return {} unless collect
175-
176-
{
177-
Activity::Railway.Inject(:collected_from_each) => ->(ctx, **) { [] }, # this is called only once.
178-
Activity::Railway.Out() => ->(ctx, collected_from_each:, **) { {collected_from_each: collected_from_each += [ctx[:value]] } }
179-
}
180-
end
181-
182-
def self.options_for_dataset_from(dataset_from:)
183-
return {} unless dataset_from
184265

185-
{
186-
Activity::Railway.Inject(:dataset, override: true) => dataset_from, # {ctx[:dataset]} is private to {each_activity}.
187-
}
188-
end
189266
end
190267

191268
if const_defined?(:Developer) # FIXME: how do you properly check for a gem?

test/docs/each_test.rb

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def composers_for_each(ctx, model:, **)
249249
seq: "[]",
250250
terminus: :failure
251251

252-
Trailblazer::Developer.wtf?(F::Song::Activity::Cover, [{params: {id: 2}, seq: []}])
252+
Trailblazer::Developer.wtf?(F::Song::Activity::Cover, {params: {id: 2}, seq: []})
253253
end
254254

255255

@@ -338,7 +338,7 @@ def composers_for_each(ctx, model:, **)
338338
end
339339

340340
it "Each(Activity::Railway) with End.spam_email" do
341-
Trailblazer::Developer.wtf?(G::Song::Activity::Cover, [{params: {id: 3}}, {}])
341+
Trailblazer::Developer.wtf?(G::Song::Activity::Cover, {params: {id: 3}})
342342

343343
assert_invoke G::Song::Activity::Cover, params: {id: 3},
344344
terminus: :spam_alert,
@@ -671,12 +671,12 @@ def composers_for_each(ctx, model:, **)
671671
it "tracing" do
672672
EachPureTest::Mailer.send_options = []
673673
#:wtf
674-
Trailblazer::Developer.wtf?(Song::Activity::Cover, [{
674+
Trailblazer::Developer.wtf?(Song::Activity::Cover, {
675675
params: {id: 1},
676676
#~meths
677677
seq: []
678678
#~meths end
679-
}])
679+
})
680680
#:wtf end
681681
end
682682
end
@@ -740,7 +740,7 @@ def self.block
740740

741741
ctx = {seq: [], dataset: [3,2,1]}
742742

743-
stack, signal, (ctx, _) = Trailblazer::Developer::Trace.invoke(activity, [ctx, {}])
743+
stack, signal, (ctx, _) = Trailblazer::Developer::Trace.invoke(activity, ctx)
744744

745745
output = Trailblazer::Developer::Trace::Present.(stack) do |trace_nodes:, **|
746746
{node_options: {trace_nodes[0] => {label: "<a-Each-b>"}}}
@@ -805,7 +805,7 @@ def self.block
805805
}
806806
end
807807

808-
Trailblazer::Developer.wtf?(activity, [{dataset: ["one", "two", "three"]}, {}])
808+
Trailblazer::Developer.wtf?(activity, {dataset: ["one", "two", "three"]})
809809

810810
assert_invoke activity, dataset: ["one", "two", "three"], expected_ctx_variables: {collected_from_each: ["one-0", "two-1", "three-2"]}
811811
end
@@ -823,11 +823,10 @@ def compute_item_with_current_user(ctx, item:, index:, current_user:, **)
823823

824824
Trailblazer::Developer.wtf?(
825825
activity,
826-
[{
826+
{
827827
dataset: ["one", "two", "three"],
828828
current_user: Object,
829829
},
830-
{}]
831830
)
832831

833832
assert_invoke activity, dataset: ["one", "two", "three"], current_user: Object, expected_ctx_variables: {collected_from_each: ["one-0-Object", "two-1-Object", "three-2-Object"]}
@@ -904,7 +903,7 @@ def check(ctx, item:, **)
904903
seq: "[:a]"
905904

906905
#@ fail at 3 but still collect 3rd iteration!
907-
Trailblazer::Developer.wtf?(activity, [{dataset: [1,2,3]}, {}])
906+
Trailblazer::Developer.wtf?(activity, {dataset: [1,2,3]})
908907

909908
assert_invoke activity, dataset: [1,2,3],
910909
expected_ctx_variables: {collected_from_each: ["1", "2", "3"]},

0 commit comments

Comments
 (0)