gemini_adk_rs/flow/
mod.rs

1//! `Flow` — a governed conversation/tool DAG.
2//!
3//! A [`Flow`] is a directed acyclic graph of [`Step`]s. A `Step` is the *only*
4//! node type; it unifies "conversation stage" and "tool-call milestone" by
5//! differing only in attributes, not in kind. A step is *done* when its
6//! completion [`Guard`] latches true; edges (`after`) are dependencies. The
7//! [`FlowMonitor`] maintains a [`Marking`] (the set of done steps) by observing
8//! the session trace, **projects** active steps' postures into turn-boundary
9//! steering, and **enforces** ordering by admitting/denying tool calls.
10//!
11//! The vocabulary is deliberately closed (see the crate docs / RFC): the only
12//! nouns are `Flow`, `Step`, `Guard`, `Posture`, `Marking`, `Verdict`. Words
13//! like *phase*, *transition*, *watch*, *needs* are lowering details and never
14//! appear here.
15//!
16//! Because every [`Guard`] atom is a named, parameterized predicate, a `Flow`
17//! is fully serializable — enabling data-driven scripts edited without a
18//! recompile. The `custom` closure escape hatch is available in code but is not
19//! serializable.
20
21use std::collections::{BTreeMap, BTreeSet, HashMap};
22use std::sync::Arc;
23
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use serde_json::Value;
26
27use crate::orchestration::{call, Mode as AgentMode};
28use crate::state::State;
29use crate::text::TextAgent;
30
31/// Evaluation context handed to a [`Guard`]: the session state plus the
32/// current flow marking.
33pub struct FlowCtx<'a> {
34    /// The session state.
35    pub state: &'a State,
36    /// The current flow marking (done steps + tool-call counts).
37    pub marking: &'a Marking,
38}
39
40/// A serializable predicate atom — the closed set of guard primitives.
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(rename_all = "snake_case")]
43pub enum Pred {
44    /// Always true.
45    Always,
46    /// State key holds boolean `true`.
47    IsTrue(String),
48    /// State key is present.
49    IsSet(String),
50    /// State key equals the given JSON value.
51    Eq(String, Value),
52    /// All of the given state keys are present (e.g. extracted slots).
53    Captured(Vec<String>),
54    /// The named tool has completed successfully at least once.
55    CalledOk(String),
56    /// The named step is done.
57    Done(String),
58    /// Conjunction.
59    All(Vec<Pred>),
60    /// Disjunction.
61    Any(Vec<Pred>),
62    /// Negation.
63    Not(Box<Pred>),
64}
65
66impl Pred {
67    fn eval(&self, ctx: &FlowCtx) -> bool {
68        match self {
69            Pred::Always => true,
70            Pred::IsTrue(k) => ctx.state.get::<bool>(k) == Some(true),
71            Pred::IsSet(k) => ctx.state.contains(k),
72            Pred::Eq(k, v) => ctx.state.get::<Value>(k).as_ref() == Some(v),
73            Pred::Captured(fields) => fields.iter().all(|f| ctx.state.contains(f)),
74            Pred::CalledOk(t) => ctx.marking.tool_ok.contains_key(t),
75            Pred::Done(s) => ctx.marking.done.contains(s),
76            Pred::All(ps) => ps.iter().all(|p| p.eval(ctx)),
77            Pred::Any(ps) => ps.iter().any(|p| p.eval(ctx)),
78            Pred::Not(p) => !p.eval(ctx),
79        }
80    }
81
82    /// Step ids referenced by `Done(..)` atoms (for validation).
83    fn referenced_steps(&self, out: &mut Vec<String>) {
84        match self {
85            Pred::Done(s) => out.push(s.clone()),
86            Pred::All(ps) | Pred::Any(ps) => ps.iter().for_each(|p| p.referenced_steps(out)),
87            Pred::Not(p) => p.referenced_steps(out),
88            _ => {}
89        }
90    }
91}
92
93/// Render a grounding template against `state`.
94///
95/// - `{key}` interpolates the value at `key` (strings bare, other JSON compact);
96///   an absent key renders empty.
97/// - `{key?yes:no}` renders `yes` when `key` is *truthy* (present and not
98///   `false`/`null`/`0`/`""`), else `no`.
99///
100/// This is the realization of `Effect::ground`: a deterministic projection of
101/// known `State` into a steering line, so the model restates facts rather than
102/// inventing them.
103pub fn render_ground(template: &str, state: &State) -> String {
104    let mut out = String::with_capacity(template.len());
105    let mut rest = template;
106    while let Some(open) = rest.find('{') {
107        out.push_str(&rest[..open]);
108        let after = &rest[open + 1..];
109        let Some(close) = after.find('}') else {
110            // Unbalanced brace: emit the remainder verbatim.
111            out.push_str(&rest[open..]);
112            return out;
113        };
114        let expr = &after[..close];
115        out.push_str(&render_expr(expr, state));
116        rest = &after[close + 1..];
117    }
118    out.push_str(rest);
119    out
120}
121
122fn render_expr(expr: &str, state: &State) -> String {
123    if let Some((cond, arms)) = expr.split_once('?') {
124        let (yes, no) = arms.split_once(':').unwrap_or((arms, ""));
125        if is_truthy(state, cond.trim()) {
126            yes.to_string()
127        } else {
128            no.to_string()
129        }
130    } else {
131        match state.get::<Value>(expr.trim()) {
132            Some(Value::String(s)) => s,
133            Some(v) => v.to_string(),
134            None => String::new(),
135        }
136    }
137}
138
139fn is_truthy(state: &State, key: &str) -> bool {
140    match state.get::<Value>(key) {
141        None | Some(Value::Null) => false,
142        Some(Value::Bool(b)) => b,
143        Some(Value::Number(n)) => n.as_f64().map(|f| f != 0.0).unwrap_or(true),
144        Some(Value::String(s)) => !s.is_empty(),
145        Some(_) => true,
146    }
147}
148
149type CustomFn = Arc<dyn Fn(&FlowCtx) -> bool + Send + Sync>;
150
151/// A boolean predicate over `(state, marking)` — the *only* predicate type.
152///
153/// Use the constructors ([`Guard::is_true`], [`Guard::captured`],
154/// [`Guard::called_ok`], …) for the serializable closed atoms, or
155/// [`Guard::custom`] for a bespoke closure (not serializable).
156#[derive(Clone)]
157pub enum Guard {
158    /// A serializable predicate built from the closed atom set.
159    Spec(Pred),
160    /// A code-only escape hatch. Not serializable.
161    Custom(CustomFn),
162}
163
164impl Guard {
165    /// Always true.
166    pub fn always() -> Self {
167        Guard::Spec(Pred::Always)
168    }
169    /// State key holds boolean `true`.
170    pub fn is_true(key: impl Into<String>) -> Self {
171        Guard::Spec(Pred::IsTrue(key.into()))
172    }
173    /// State key is present.
174    pub fn is_set(key: impl Into<String>) -> Self {
175        Guard::Spec(Pred::IsSet(key.into()))
176    }
177    /// State key equals the given JSON value.
178    pub fn eq(key: impl Into<String>, value: impl Into<Value>) -> Self {
179        Guard::Spec(Pred::Eq(key.into(), value.into()))
180    }
181    /// All of the given state keys are present (extracted slots).
182    pub fn captured<I, S>(fields: I) -> Self
183    where
184        I: IntoIterator<Item = S>,
185        S: Into<String>,
186    {
187        Guard::Spec(Pred::Captured(fields.into_iter().map(Into::into).collect()))
188    }
189    /// The named tool has completed successfully.
190    pub fn called_ok(tool: impl Into<String>) -> Self {
191        Guard::Spec(Pred::CalledOk(tool.into()))
192    }
193    /// The named step is done.
194    pub fn done(step: impl Into<String>) -> Self {
195        Guard::Spec(Pred::Done(step.into()))
196    }
197    /// True once an orchestrated agent named `name` has produced a result
198    /// (its `{name}:result` state key is set). Pairs with the
199    /// [`orchestration`](crate::orchestration) `call`/`dispatch`/`background`.
200    pub fn resolved(name: impl AsRef<str>) -> Self {
201        Guard::Spec(Pred::IsSet(format!("{}:result", name.as_ref())))
202    }
203    /// Conjunction.
204    ///
205    /// If every input is a serializable atom, the result is a serializable
206    /// `Pred::All`. If any input is a [`Guard::custom`], the result is itself a
207    /// custom guard that evaluates the conjunction at runtime — the custom guard
208    /// is **never silently dropped** (it merely makes the combinator
209    /// non-serializable, which surfaces as an error only if you try to serialize
210    /// the flow).
211    pub fn all(guards: impl IntoIterator<Item = Guard>) -> Self {
212        let guards: Vec<Guard> = guards.into_iter().collect();
213        if guards.iter().all(|g| matches!(g, Guard::Spec(_))) {
214            Guard::Spec(Pred::All(specs_unchecked(guards)))
215        } else {
216            Guard::Custom(Arc::new(move |ctx| guards.iter().all(|g| g.eval(ctx))))
217        }
218    }
219    /// Disjunction.
220    ///
221    /// Mirrors [`Guard::all`]: custom inputs are preserved as a runtime closure
222    /// rather than erased.
223    pub fn any(guards: impl IntoIterator<Item = Guard>) -> Self {
224        let guards: Vec<Guard> = guards.into_iter().collect();
225        if guards.iter().all(|g| matches!(g, Guard::Spec(_))) {
226            Guard::Spec(Pred::Any(specs_unchecked(guards)))
227        } else {
228            Guard::Custom(Arc::new(move |ctx| guards.iter().any(|g| g.eval(ctx))))
229        }
230    }
231    /// Negation of a serializable atom.
232    #[allow(clippy::should_implement_trait)]
233    pub fn not(guard: Guard) -> Self {
234        match guard {
235            Guard::Spec(p) => Guard::Spec(Pred::Not(Box::new(p))),
236            // Negating a custom guard yields a custom guard.
237            Guard::Custom(f) => Guard::Custom(Arc::new(move |ctx| !f(ctx))),
238        }
239    }
240    /// A bespoke closure over `(state, marking)`. Not serializable.
241    pub fn custom(f: impl Fn(&FlowCtx) -> bool + Send + Sync + 'static) -> Self {
242        Guard::Custom(Arc::new(f))
243    }
244
245    /// Evaluate the guard.
246    pub fn eval(&self, ctx: &FlowCtx) -> bool {
247        match self {
248            Guard::Spec(p) => p.eval(ctx),
249            Guard::Custom(f) => f(ctx),
250        }
251    }
252
253    fn referenced_steps(&self, out: &mut Vec<String>) {
254        if let Guard::Spec(p) = self {
255            p.referenced_steps(out);
256        }
257    }
258}
259
260/// Unwrap a list of guards known to be all `Spec` into their predicates.
261///
262/// The caller (`Guard::all`/`Guard::any`) only invokes this after verifying every
263/// guard is a `Spec`, so the `Custom` arm is unreachable.
264fn specs_unchecked(guards: Vec<Guard>) -> Vec<Pred> {
265    guards
266        .into_iter()
267        .map(|g| match g {
268            Guard::Spec(p) => p,
269            Guard::Custom(_) => unreachable!("specs_unchecked called with a custom guard"),
270        })
271        .collect()
272}
273
274impl Serialize for Guard {
275    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
276        match self {
277            Guard::Spec(p) => p.serialize(s),
278            Guard::Custom(_) => Err(serde::ser::Error::custom(
279                "custom guards are not serializable; use Guard atoms for data-driven flows",
280            )),
281        }
282    }
283}
284
285impl<'de> Deserialize<'de> for Guard {
286    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
287        Ok(Guard::Spec(Pred::deserialize(d)?))
288    }
289}
290
291impl std::fmt::Debug for Guard {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        match self {
294            Guard::Spec(p) => write!(f, "{p:?}"),
295            Guard::Custom(_) => write!(f, "Custom(<fn>)"),
296        }
297    }
298}
299
300/// A node in the flow DAG — the only node type.
301#[derive(Clone, Debug, Serialize, Deserialize)]
302pub struct Step {
303    /// Unique step id.
304    pub id: String,
305    /// Dependency step ids; this step is only eligible once all are done.
306    #[serde(default, skip_serializing_if = "Vec::is_empty")]
307    pub after: Vec<String>,
308    /// Extra eligibility predicate beyond dependencies.
309    #[serde(default, skip_serializing_if = "Option::is_none")]
310    pub gate: Option<Guard>,
311    /// Completion condition. Required for non-terminal steps.
312    #[serde(default, skip_serializing_if = "Option::is_none")]
313    pub done: Option<Guard>,
314    /// Instruction imposed while this step is active (projected as steering).
315    #[serde(default, skip_serializing_if = "Option::is_none")]
316    pub posture: Option<String>,
317    /// A grounding template projected while active: a curated, `State`-interpolated
318    /// fact line that pins the model to known values (anti-hallucination). See
319    /// [`render_ground`]. Serializable, like `posture`.
320    #[serde(default, skip_serializing_if = "Option::is_none")]
321    pub ground: Option<String>,
322    /// Tools available while this step is active (whitelist; empty = no restriction).
323    #[serde(default, skip_serializing_if = "Vec::is_empty")]
324    pub allow: Vec<String>,
325    /// Tools forbidden while this step is active.
326    #[serde(default, skip_serializing_if = "Vec::is_empty")]
327    pub deny: Vec<String>,
328    /// A terminal step — reaching it (deps + gate) marks it done with no milestone.
329    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
330    pub terminal: bool,
331}
332
333/// A cross-cutting flow constraint.
334#[derive(Clone, Debug, Serialize, Deserialize)]
335#[serde(rename_all = "snake_case")]
336pub enum Constraint {
337    /// A tool may complete at most once.
338    Once(String),
339    /// Step `0` must be done before step `1` starts.
340    Before(String, String),
341    /// A tool is forbidden until the guard holds.
342    NeverUntil {
343        /// The gated tool.
344        tool: String,
345        /// The guard that must hold to permit it.
346        until: Guard,
347    },
348    /// These steps must be done for the flow to be complete.
349    Require(Vec<String>),
350}
351
352/// A governed conversation/tool DAG.
353#[derive(Clone, Debug, Default, Serialize, Deserialize)]
354pub struct Flow {
355    /// The steps (DAG nodes).
356    pub steps: Vec<Step>,
357    /// Cross-cutting constraints.
358    #[serde(default, skip_serializing_if = "Vec::is_empty")]
359    pub constraints: Vec<Constraint>,
360    /// Tools that require confirmation when reached (set by `commit`).
361    #[serde(default, skip_serializing_if = "Vec::is_empty")]
362    pub confirm_tools: Vec<String>,
363}
364
365impl Flow {
366    /// Start building a flow.
367    #[allow(
368        clippy::new_ret_no_self,
369        reason = "Flow::new() is the builder entry point; a Flow comes from FlowBuilder::build/compile"
370    )]
371    pub fn new() -> FlowBuilder {
372        FlowBuilder::default()
373    }
374
375    fn step(&self, id: &str) -> Option<&Step> {
376        self.steps.iter().find(|s| s.id == id)
377    }
378
379    /// Validate referential integrity and acyclicity.
380    pub fn validate(&self) -> Result<(), Vec<String>> {
381        let mut errs = Vec::new();
382        let ids: BTreeSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
383        if ids.len() != self.steps.len() {
384            errs.push("duplicate step ids".into());
385        }
386        for s in &self.steps {
387            for d in &s.after {
388                if !ids.contains(d.as_str()) {
389                    errs.push(format!("step '{}' depends on unknown step '{}'", s.id, d));
390                }
391            }
392            if !s.terminal && s.done.is_none() {
393                errs.push(format!(
394                    "non-terminal step '{}' has no `done` condition (it can never complete)",
395                    s.id
396                ));
397            }
398            for g in s.gate.iter().chain(s.done.iter()) {
399                let mut refs = Vec::new();
400                g.referenced_steps(&mut refs);
401                for r in refs {
402                    if !ids.contains(r.as_str()) {
403                        errs.push(format!(
404                            "step '{}' guard references unknown step '{r}'",
405                            s.id
406                        ));
407                    }
408                }
409            }
410        }
411        for c in &self.constraints {
412            match c {
413                Constraint::Before(a, b) => {
414                    for x in [a, b] {
415                        if !ids.contains(x.as_str()) {
416                            errs.push(format!("constraint `before` references unknown step '{x}'"));
417                        }
418                    }
419                }
420                Constraint::Require(rs) => {
421                    for r in rs {
422                        if !ids.contains(r.as_str()) {
423                            errs.push(format!(
424                                "constraint `require` references unknown step '{r}'"
425                            ));
426                        }
427                    }
428                }
429                _ => {}
430            }
431        }
432        if self.has_cycle() {
433            errs.push("flow dependency graph has a cycle (must be a DAG)".into());
434        }
435        if errs.is_empty() {
436            Ok(())
437        } else {
438            Err(errs)
439        }
440    }
441
442    /// Every tool name referenced anywhere in the flow (allow/deny/once/
443    /// never_until/confirm). The universe over which [`ToolPolicy`] reasons.
444    fn tool_universe(&self) -> BTreeSet<String> {
445        let mut tools = BTreeSet::new();
446        for s in &self.steps {
447            tools.extend(s.allow.iter().cloned());
448            tools.extend(s.deny.iter().cloned());
449        }
450        for c in &self.constraints {
451            match c {
452                Constraint::Once(t) => {
453                    tools.insert(t.clone());
454                }
455                Constraint::NeverUntil { tool, .. } => {
456                    tools.insert(tool.clone());
457                }
458                _ => {}
459            }
460        }
461        tools.extend(self.confirm_tools.iter().cloned());
462        tools
463    }
464
465    /// Steps reachable from a root (a step with no `after` deps), following both
466    /// `after` edges and `Before(a, b)` ordering edges.
467    fn reachable_steps(&self) -> BTreeSet<String> {
468        let ids: BTreeSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
469        // Forward edges: a -> b when b.after contains a, or Before(a, b).
470        let mut succ: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
471        for s in &self.steps {
472            for d in &s.after {
473                if ids.contains(d.as_str()) {
474                    succ.entry(d.as_str()).or_default().push(s.id.as_str());
475                }
476            }
477        }
478        for c in &self.constraints {
479            if let Constraint::Before(a, b) = c {
480                if ids.contains(a.as_str()) && ids.contains(b.as_str()) {
481                    succ.entry(a.as_str()).or_default().push(b.as_str());
482                }
483            }
484        }
485        let roots: Vec<&str> = self
486            .steps
487            .iter()
488            .filter(|s| s.after.is_empty())
489            .map(|s| s.id.as_str())
490            .collect();
491        let mut seen = BTreeSet::new();
492        let mut stack = roots;
493        while let Some(id) = stack.pop() {
494            if seen.insert(id.to_string()) {
495                if let Some(next) = succ.get(id) {
496                    stack.extend(next.iter().copied());
497                }
498            }
499        }
500        seen
501    }
502
503    /// Compile and validate the flow into a [`CompiledFlow`], turning a class of
504    /// runtime surprises into load-time errors.
505    ///
506    /// On top of [`validate`](Self::validate)'s referential/acyclicity checks this
507    /// reports: unreachable steps, commit tools guarded by an always-true
508    /// condition (an effectively *unguarded* commit, which defeats the
509    /// confirm-before-commit contract), `never…until` guards whose `done(step)`
510    /// atoms reference unknown steps (unsatisfiable — the tool would be forbidden
511    /// forever), and ordering cycles across the combined `after` + `before` edges
512    /// (which deadlock every step on the cycle). Precomputes the [`ToolPolicy`]
513    /// universe.
514    ///
515    /// To additionally validate tool names against a known registry, use
516    /// [`compile_with_tools`](Self::compile_with_tools).
517    pub fn compile(self) -> Result<CompiledFlow, FlowErrors> {
518        self.compile_internal(None)
519    }
520
521    /// Compile like [`compile`](Self::compile), additionally validating every
522    /// tool name the flow references (step `allow`/`deny`, `once`,
523    /// `never…until`, and commit/confirm tools) against the given registry of
524    /// known tool names.
525    ///
526    /// A referenced tool missing from `tools` is reported as
527    /// [`FlowError::UnknownTool`] — catching typos and drift between a flow
528    /// script and the tools actually registered on the session.
529    ///
530    /// ```ignore
531    /// let compiled = flow.compile_with_tools(&["lookup_account", "charge_card"])?;
532    /// ```
533    pub fn compile_with_tools(self, tools: &[&str]) -> Result<CompiledFlow, FlowErrors> {
534        self.compile_internal(Some(tools))
535    }
536
537    fn compile_internal(self, registry: Option<&[&str]>) -> Result<CompiledFlow, FlowErrors> {
538        let mut errors = Vec::new();
539        if let Err(errs) = self.validate() {
540            errors.extend(errs.into_iter().map(FlowError::Invalid));
541        }
542
543        // Graph-shape checks (only meaningful once the graph is acyclic/valid).
544        if errors.is_empty() {
545            // Unreachable steps.
546            let reachable = self.reachable_steps();
547            for s in &self.steps {
548                if !reachable.contains(&s.id) {
549                    errors.push(FlowError::UnreachableStep(s.id.clone()));
550                }
551            }
552            // Ordering cycles across the combined `after` + `before(a, b)` edges.
553            // `validate()` only walks `after`; a cycle closed by a `Before`
554            // constraint deadlocks every step on it (none can become eligible).
555            if let Some(cycle) = self.ordering_cycle() {
556                errors.push(FlowError::OrderingCycle(cycle));
557            }
558        }
559
560        // A commit tool guarded by an always-true condition is effectively
561        // unguarded — the confirm-before-commit contract would never gate it.
562        for tool in &self.confirm_tools {
563            let guard = self.constraints.iter().find_map(|c| match c {
564                Constraint::NeverUntil { tool: t, until } if t == tool => Some(until),
565                _ => None,
566            });
567            let unguarded = matches!(guard, None | Some(Guard::Spec(Pred::Always)));
568            if unguarded {
569                errors.push(FlowError::UnguardedCommitTool(tool.clone()));
570            }
571        }
572
573        // An unsatisfiable `never(tool).until(guard)`: the guard's `done(step)`
574        // atom references a step that doesn't exist, so it can never latch and
575        // the tool is forbidden forever. (Step gate/done guards are already
576        // covered by `validate()`; constraints were not.)
577        let ids: BTreeSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
578        for c in &self.constraints {
579            if let Constraint::NeverUntil { tool, until } = c {
580                let mut refs = Vec::new();
581                until.referenced_steps(&mut refs);
582                for r in refs {
583                    if !ids.contains(r.as_str()) {
584                        errors.push(FlowError::UnsatisfiableGuard {
585                            tool: tool.clone(),
586                            step: r,
587                        });
588                    }
589                }
590            }
591        }
592
593        // Dangling tool names vs a known registry (opt-in).
594        if let Some(known) = registry {
595            for tool in self.tool_universe() {
596                if !known.contains(&tool.as_str()) {
597                    errors.push(FlowError::UnknownTool(tool));
598                }
599            }
600        }
601
602        if errors.is_empty() {
603            let policy = ToolPolicy {
604                tools: self.tool_universe(),
605            };
606            Ok(CompiledFlow { flow: self, policy })
607        } else {
608            Err(FlowErrors(errors))
609        }
610    }
611
612    /// Find a cycle over the combined dependency edges (`after` plus
613    /// `before(a, b)` ordering constraints), if any. Returns the step ids on
614    /// the cycle path. `None` when the combined graph is acyclic.
615    fn ordering_cycle(&self) -> Option<Vec<String>> {
616        // Predecessor edges: step -> everything that must be done before it.
617        let mut deps: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
618        for s in &self.steps {
619            let entry = deps.entry(s.id.as_str()).or_default();
620            entry.extend(s.after.iter().map(String::as_str));
621        }
622        for c in &self.constraints {
623            if let Constraint::Before(a, b) = c {
624                deps.entry(b.as_str()).or_default().push(a.as_str());
625            }
626        }
627        // DFS with colors; on a back-edge, report the current path suffix.
628        fn dfs<'a>(
629            id: &'a str,
630            deps: &BTreeMap<&'a str, Vec<&'a str>>,
631            color: &mut BTreeMap<&'a str, u8>,
632            path: &mut Vec<&'a str>,
633        ) -> Option<Vec<String>> {
634            color.insert(id, 1);
635            path.push(id);
636            for d in deps.get(id).into_iter().flatten() {
637                match color.get(d).copied() {
638                    Some(1) => {
639                        let start = path.iter().position(|p| p == d).unwrap_or(0);
640                        return Some(path[start..].iter().map(|s| s.to_string()).collect());
641                    }
642                    Some(2) => {}
643                    _ => {
644                        if let Some(cycle) = dfs(d, deps, color, path) {
645                            return Some(cycle);
646                        }
647                    }
648                }
649            }
650            path.pop();
651            color.insert(id, 2);
652            None
653        }
654        let mut color: BTreeMap<&str, u8> = BTreeMap::new();
655        for s in &self.steps {
656            if color.get(s.id.as_str()).copied().unwrap_or(0) == 0 {
657                let mut path = Vec::new();
658                if let Some(cycle) = dfs(&s.id, &deps, &mut color, &mut path) {
659                    return Some(cycle);
660                }
661            }
662        }
663        None
664    }
665
666    fn has_cycle(&self) -> bool {
667        // DFS with colors over the `after` dependency edges.
668        let mut color: BTreeMap<&str, u8> = BTreeMap::new();
669        fn dfs<'a>(flow: &'a Flow, id: &'a str, color: &mut BTreeMap<&'a str, u8>) -> bool {
670            color.insert(id, 1);
671            if let Some(step) = flow.step(id) {
672                for d in &step.after {
673                    match color.get(d.as_str()).copied() {
674                        Some(1) => return true,
675                        Some(2) => {}
676                        _ => {
677                            if dfs(flow, d, color) {
678                                return true;
679                            }
680                        }
681                    }
682                }
683            }
684            color.insert(id, 2);
685            false
686        }
687        for s in &self.steps {
688            if color.get(s.id.as_str()).copied().unwrap_or(0) == 0 && dfs(self, &s.id, &mut color) {
689                return true;
690            }
691        }
692        false
693    }
694
695    /// Render the flow as a Mermaid `flowchart` — the spec *is* the diagram.
696    pub fn to_mermaid(&self) -> String {
697        let mut out = String::from("flowchart TD\n");
698        for s in &self.steps {
699            let shape = if s.terminal {
700                format!("    {}([{}])\n", s.id, s.id)
701            } else {
702                format!("    {}[{}]\n", s.id, s.id)
703            };
704            out.push_str(&shape);
705        }
706        for s in &self.steps {
707            for d in &s.after {
708                out.push_str(&format!("    {d} --> {}\n", s.id));
709            }
710        }
711        out
712    }
713}
714
715/// The runtime position in a flow: which steps are done and how often each
716/// tool has succeeded.
717#[derive(Clone, Debug, Default)]
718pub struct Marking {
719    /// Steps that have latched done.
720    pub done: BTreeSet<String>,
721    /// Per-tool successful-completion counts.
722    pub tool_ok: BTreeMap<String, u32>,
723    /// Turns observed.
724    pub turns: u32,
725}
726
727/// The conformance status of a step.
728#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
729#[serde(rename_all = "snake_case")]
730pub enum Verdict {
731    /// Not yet eligible.
732    Pending,
733    /// Eligible and awaiting completion.
734    Active,
735    /// Completed.
736    Done,
737    /// A successor completed while this never did (an out-of-order deviation).
738    Skipped,
739}
740
741/// A recorded conformance deviation (observe mode) or denial (enforce mode).
742#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
743pub struct Violation {
744    /// What was attempted (e.g. a tool name).
745    pub subject: String,
746    /// Why it was a violation.
747    pub reason: String,
748}
749
750/// How a [`FlowMonitor`] treats off-path activity — enforcement vs observation.
751///
752/// Renamed from `Mode` to remove the collision with
753/// [`orchestration::Mode`](crate::orchestration::Mode) (`Call`/`Dispatch`/
754/// `Background`), which is the unrelated *resolver execution discipline*.
755#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
756pub enum Enforcement {
757    /// Block inadmissible tool calls and steer back on-path.
758    #[default]
759    Enforce,
760    /// Allow everything, but record deviations for audit/analytics.
761    Observe,
762}
763
764/// Deprecated alias for [`Enforcement`], kept for one release.
765#[deprecated(note = "renamed to `Enforcement` to avoid colliding with orchestration::Mode")]
766pub type Mode = Enforcement;
767
768/// An action fired the first time a step becomes active: run an agent in an
769/// [`AgentMode`]. Built with [`run`]. The result lands in `{name}:result` (the
770/// name defaults to the step id), so a *downstream* step can complete on it via
771/// [`Guard::resolved`] — this is how a flow drives orchestration in-session.
772#[derive(Clone)]
773pub struct StepAction {
774    name: Option<String>,
775    agent: Arc<dyn TextAgent>,
776    mode: AgentMode,
777}
778
779/// Build a step-enter action that runs `agent` in `mode` when the step first
780/// activates. Pair with [`FlowMonitor::on_enter`].
781///
782/// ```ignore
783/// let mon = FlowMonitor::new(flow, Enforcement::Enforce)
784///     .on_enter("check", run(availability_agent, AgentMode::Dispatch));
785/// ```
786pub fn run(agent: Arc<dyn TextAgent>, mode: AgentMode) -> StepAction {
787    StepAction {
788        name: None,
789        agent,
790        mode,
791    }
792}
793
794impl StepAction {
795    /// Override the result name (defaults to the step id it is attached to).
796    pub fn named(mut self, name: impl Into<String>) -> Self {
797        self.name = Some(name.into());
798        self
799    }
800
801    /// Run the action. `Call` awaits inline; `Dispatch`/`Background` spawn it
802    /// detached so the turn is never blocked.
803    pub(crate) async fn fire(&self, step_id: &str, state: &State) {
804        let name = self.name.clone().unwrap_or_else(|| step_id.to_string());
805        match self.mode {
806            AgentMode::Call => {
807                let _ = call(&name, self.agent.clone(), state).await;
808            }
809            AgentMode::Dispatch | AgentMode::Background => {
810                let agent = self.agent.clone();
811                let state = state.clone();
812                tokio::spawn(async move {
813                    let _ = call(&name, agent, &state).await;
814                });
815            }
816        }
817    }
818}
819
820/// A shared, lock-protected [`FlowMonitor`] — the form in which the Live
821/// control plane owns a governed flow, so runtime surfaces (e.g.
822/// [`LiveHandle::why_blocked`](crate::live::LiveHandle::why_blocked)) can
823/// snapshot it concurrently. All monitor methods are synchronous: lock
824/// briefly and never hold the guard across an `await`.
825pub type SharedFlowMonitor = Arc<parking_lot::Mutex<FlowMonitor>>;
826
827/// Observes the session trace, maintains the [`Marking`], answers tool
828/// admissibility, and projects active postures.
829pub struct FlowMonitor {
830    flow: Flow,
831    mode: Enforcement,
832    marking: Marking,
833    violations: Vec<Violation>,
834    /// Per-step actions fired the first time the step becomes active.
835    enter_actions: HashMap<String, StepAction>,
836    /// Steps whose `on_enter` action has already fired (fire-once).
837    announced: BTreeSet<String>,
838}
839
840impl FlowMonitor {
841    /// Create a monitor for a (presumed-valid) flow.
842    ///
843    /// Prefer [`FlowMonitor::compiled`] or [`FlowMonitor::try_new`], which carry
844    /// proof of compilation; this convenience skips compilation for flows already
845    /// known valid (e.g. built in-process by trusted code).
846    pub fn new(flow: Flow, mode: Enforcement) -> Self {
847        Self {
848            flow,
849            mode,
850            marking: Marking::default(),
851            violations: Vec::new(),
852            enter_actions: HashMap::new(),
853            announced: BTreeSet::new(),
854        }
855    }
856
857    /// Create a monitor from a [`CompiledFlow`] — the validated path.
858    pub fn compiled(flow: CompiledFlow, mode: Enforcement) -> Self {
859        Self::new(flow.into_flow(), mode)
860    }
861
862    /// Compile `flow` and create a monitor, surfacing structural errors instead
863    /// of trusting the caller.
864    pub fn try_new(flow: Flow, mode: Enforcement) -> Result<Self, FlowErrors> {
865        Ok(Self::compiled(flow.compile()?, mode))
866    }
867
868    /// Wrap this monitor in a [`SharedFlowMonitor`] for shared ownership
869    /// between the control lane (which advances it) and runtime accessors
870    /// (which snapshot it, e.g.
871    /// [`LiveHandle::explain`](crate::live::LiveHandle::explain)).
872    pub fn into_shared(self) -> SharedFlowMonitor {
873        Arc::new(parking_lot::Mutex::new(self))
874    }
875
876    /// Explain the current control-plane state: active steps, which tools are
877    /// admitted vs blocked (with reasons), and unmet requirements.
878    ///
879    /// This is the deterministic answer to "why did the assistant ask that?" —
880    /// model-readable, without the model driving control flow.
881    pub fn explain(&self, state: &State) -> FlowExplanation {
882        let active = self
883            .active_steps(state)
884            .iter()
885            .map(|s| s.id.clone())
886            .collect();
887        let mut allowed_tools = Vec::new();
888        let mut blocked_tools = BTreeMap::new();
889        for tool in self.flow.tool_universe() {
890            match self.admits_tool(&tool, state) {
891                Ok(()) => allowed_tools.push(tool),
892                Err(reason) => {
893                    blocked_tools.insert(tool, reason);
894                }
895            }
896        }
897        FlowExplanation {
898            active,
899            allowed_tools,
900            blocked_tools,
901            missing_requirements: self.unmet_requirements(),
902        }
903    }
904
905    /// Why the flow is blocked right now — alias of [`explain`](Self::explain),
906    /// named for the common debugging question.
907    pub fn why_blocked(&self, state: &State) -> FlowExplanation {
908        self.explain(state)
909    }
910
911    /// Attach an action fired the first time `step` becomes active (see
912    /// [`run`]). Chainable at construction time.
913    pub fn on_enter(mut self, step: impl Into<String>, action: StepAction) -> Self {
914        self.enter_actions.insert(step.into(), action);
915        self
916    }
917
918    /// Steps that became active since the last call — each reported exactly once
919    /// over the session. Drives [`on_enter`](Self::on_enter) firing.
920    pub fn take_newly_active(&mut self, state: &State) -> Vec<String> {
921        let mut fresh = Vec::new();
922        for s in self.active_steps(state) {
923            if !self.announced.contains(&s.id) {
924                fresh.push(s.id.clone());
925            }
926        }
927        for id in &fresh {
928            self.announced.insert(id.clone());
929        }
930        fresh
931    }
932
933    /// The enter-action registered for a step, if any.
934    pub fn enter_action(&self, step: &str) -> Option<&StepAction> {
935        self.enter_actions.get(step)
936    }
937
938    /// Fire enter-actions for every step that just became active. Convenience
939    /// over [`take_newly_active`](Self::take_newly_active) + [`enter_action`](Self::enter_action);
940    /// call it right after [`on_turn`](Self::on_turn).
941    pub async fn fire_enter_actions(&mut self, state: &State) {
942        for id in self.take_newly_active(state) {
943            if let Some(action) = self.enter_actions.get(&id) {
944                action.fire(&id, state).await;
945            }
946        }
947    }
948
949    /// The enforcement mode this monitor runs in.
950    pub fn mode(&self) -> Enforcement {
951        self.mode
952    }
953
954    /// Evaluate a [`Guard`] against this monitor's current context (the given
955    /// `state` plus the monitor's marking). Used to test overlay/digression
956    /// triggers without exposing the internal context.
957    pub fn eval(&self, guard: &Guard, state: &State) -> bool {
958        guard.eval(&self.ctx(state))
959    }
960    /// The current marking.
961    pub fn marking(&self) -> &Marking {
962        &self.marking
963    }
964    /// Recorded violations.
965    pub fn violations(&self) -> &[Violation] {
966        &self.violations
967    }
968    /// The underlying flow.
969    pub fn flow(&self) -> &Flow {
970        &self.flow
971    }
972
973    fn ctx<'a>(&'a self, state: &'a State) -> FlowCtx<'a> {
974        FlowCtx {
975            state,
976            marking: &self.marking,
977        }
978    }
979
980    fn eligible(&self, step: &Step, state: &State) -> bool {
981        let deps_done = step.after.iter().all(|d| self.marking.done.contains(d));
982        // Enforce `Constraint::Before(a, step)`: `a` must be done before this
983        // step may start (an ordering constraint declared outside `after`).
984        let before_ok = self.flow.constraints.iter().all(|c| match c {
985            Constraint::Before(a, b) if *b == step.id => self.marking.done.contains(a),
986            _ => true,
987        });
988        let gate_ok = step
989            .gate
990            .as_ref()
991            .map(|g| g.eval(&self.ctx(state)))
992            .unwrap_or(true);
993        deps_done && before_ok && gate_ok
994    }
995
996    /// Re-evaluate completion latches to a fixpoint. Call after any event that
997    /// can change state or the marking (turn boundary, tool completion).
998    pub fn relatch(&mut self, state: &State) {
999        loop {
1000            let mut newly_done: Vec<String> = Vec::new();
1001            for s in &self.flow.steps {
1002                if self.marking.done.contains(&s.id) {
1003                    continue;
1004                }
1005                if !self.eligible(s, state) {
1006                    continue;
1007                }
1008                let complete = if s.terminal {
1009                    true
1010                } else {
1011                    s.done
1012                        .as_ref()
1013                        .map(|g| g.eval(&self.ctx(state)))
1014                        .unwrap_or(false)
1015                };
1016                if complete {
1017                    newly_done.push(s.id.clone());
1018                }
1019            }
1020            if newly_done.is_empty() {
1021                break;
1022            }
1023            for id in newly_done {
1024                self.marking.done.insert(id);
1025            }
1026        }
1027    }
1028
1029    /// Record a turn boundary, then re-latch.
1030    pub fn on_turn(&mut self, state: &State) {
1031        self.marking.turns += 1;
1032        self.relatch(state);
1033    }
1034
1035    /// Record a successful tool call, then re-latch.
1036    pub fn on_tool_ok(&mut self, tool: &str, state: &State) {
1037        *self.marking.tool_ok.entry(tool.to_string()).or_insert(0) += 1;
1038        self.relatch(state);
1039    }
1040
1041    /// Steps that are eligible but not yet done.
1042    pub fn active_steps(&self, state: &State) -> Vec<&Step> {
1043        self.flow
1044            .steps
1045            .iter()
1046            .filter(|s| !self.marking.done.contains(&s.id) && self.eligible(s, state))
1047            .collect()
1048    }
1049
1050    /// Postures of the active steps — to inject as turn-boundary steering.
1051    pub fn active_postures(&self, state: &State) -> Vec<String> {
1052        self.active_steps(state)
1053            .into_iter()
1054            .filter_map(|s| s.posture.clone())
1055            .collect()
1056    }
1057
1058    /// Rendered grounding lines of the active steps — curated, `State`-
1059    /// interpolated facts to inject as turn-boundary steering (anti-hallucination).
1060    pub fn active_grounds(&self, state: &State) -> Vec<String> {
1061        self.active_steps(state)
1062            .into_iter()
1063            .filter_map(|s| s.ground.as_ref().map(|t| render_ground(t, state)))
1064            .filter(|s| !s.trim().is_empty())
1065            .collect()
1066    }
1067
1068    /// Required steps not yet done (drives repair).
1069    pub fn unmet_requirements(&self) -> Vec<String> {
1070        self.flow
1071            .constraints
1072            .iter()
1073            .flat_map(|c| match c {
1074                Constraint::Require(rs) => rs.clone(),
1075                _ => Vec::new(),
1076            })
1077            .filter(|r| !self.marking.done.contains(r))
1078            .collect()
1079    }
1080
1081    /// Whether all required steps are done.
1082    pub fn is_complete(&self) -> bool {
1083        self.unmet_requirements().is_empty()
1084    }
1085
1086    /// The conformance verdict for a step.
1087    pub fn verdict(&self, step_id: &str, state: &State) -> Verdict {
1088        if self.marking.done.contains(step_id) {
1089            return Verdict::Done;
1090        }
1091        if let Some(step) = self.flow.step(step_id) {
1092            if self.eligible(step, state) {
1093                return Verdict::Active;
1094            }
1095        }
1096        // Skipped: a successor is done but this step never completed.
1097        let bypassed = self
1098            .flow
1099            .steps
1100            .iter()
1101            .any(|s| s.after.iter().any(|d| d == step_id) && self.marking.done.contains(&s.id));
1102        if bypassed {
1103            Verdict::Skipped
1104        } else {
1105            Verdict::Pending
1106        }
1107    }
1108
1109    /// Decide whether a tool call may proceed. `Ok(())` admits it; `Err(reason)`
1110    /// denies it (the caller blocks in Enforce mode, or records in Observe).
1111    pub fn admits_tool(&self, tool: &str, state: &State) -> Result<(), String> {
1112        // 1. once(tool)
1113        for c in &self.flow.constraints {
1114            if let Constraint::Once(t) = c {
1115                if t == tool && self.marking.tool_ok.contains_key(tool) {
1116                    return Err(format!("'{tool}' may run at most once"));
1117                }
1118            }
1119        }
1120        // 2. never(tool).until(guard)
1121        for c in &self.flow.constraints {
1122            if let Constraint::NeverUntil { tool: t, until } = c {
1123                if t == tool && !until.eval(&self.ctx(state)) {
1124                    return Err(format!("'{tool}' is not permitted yet"));
1125                }
1126            }
1127        }
1128        // 3. active allow/deny (whitelist while any active step restricts).
1129        let active = self.active_steps(state);
1130        if active.iter().any(|s| s.deny.iter().any(|d| d == tool)) {
1131            return Err(format!("'{tool}' is not available in the current step"));
1132        }
1133        let restricting: Vec<&&Step> = active.iter().filter(|s| !s.allow.is_empty()).collect();
1134        if !restricting.is_empty()
1135            && !restricting
1136                .iter()
1137                .any(|s| s.allow.iter().any(|a| a == tool))
1138        {
1139            return Err(format!("'{tool}' is not available in the current step"));
1140        }
1141        Ok(())
1142    }
1143
1144    /// Observe a tool call for conformance. In Enforce mode the caller has
1145    /// already gated via [`admits_tool`](Self::admits_tool); this records the
1146    /// call and, in Observe mode, logs a deviation if it was inadmissible.
1147    pub fn observe_tool(&mut self, tool: &str, ok: bool, state: &State) {
1148        if self.mode == Enforcement::Observe {
1149            if let Err(reason) = self.admits_tool(tool, state) {
1150                self.violations.push(Violation {
1151                    subject: tool.to_string(),
1152                    reason,
1153                });
1154            }
1155        }
1156        if ok {
1157            self.on_tool_ok(tool, state);
1158        }
1159    }
1160}
1161
1162/// A single problem found while compiling a [`Flow`].
1163#[derive(Debug, Clone, PartialEq, Eq)]
1164pub enum FlowError {
1165    /// A referential-integrity or acyclicity error from [`Flow::validate`].
1166    Invalid(String),
1167    /// A step that no path from a root can ever reach.
1168    UnreachableStep(String),
1169    /// A commit (confirm) tool whose gate is always true — effectively
1170    /// unguarded, defeating confirm-before-commit.
1171    UnguardedCommitTool(String),
1172    /// A tool referenced by the flow (step `allow`/`deny`, `once`,
1173    /// `never…until`, confirm) that is not in the registry given to
1174    /// [`Flow::compile_with_tools`].
1175    UnknownTool(String),
1176    /// A `never(tool).until(guard)` whose guard references a step id that
1177    /// doesn't exist — the guard can never latch, so the tool would be
1178    /// forbidden forever.
1179    UnsatisfiableGuard {
1180        /// The tool the constraint gates.
1181        tool: String,
1182        /// The unknown step id the guard's `done(..)` atom references.
1183        step: String,
1184    },
1185    /// A cycle over the combined `after` + `before(a, b)` ordering edges —
1186    /// every step on the cycle waits on another, so none can ever become
1187    /// eligible. Contains the step ids on the cycle.
1188    OrderingCycle(Vec<String>),
1189}
1190
1191impl std::fmt::Display for FlowError {
1192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193        match self {
1194            FlowError::Invalid(m) => write!(f, "{m}"),
1195            FlowError::UnreachableStep(id) => {
1196                write!(f, "step '{id}' is unreachable from any root")
1197            }
1198            FlowError::UnguardedCommitTool(t) => write!(
1199                f,
1200                "commit tool '{t}' is guarded by an always-true condition (effectively unguarded)"
1201            ),
1202            FlowError::UnknownTool(t) => write!(
1203                f,
1204                "flow references tool '{t}' which is not in the provided tool registry"
1205            ),
1206            FlowError::UnsatisfiableGuard { tool, step } => write!(
1207                f,
1208                "`never('{tool}').until(..)` references unknown step '{step}' — the guard can \
1209                 never hold, so '{tool}' would be forbidden forever"
1210            ),
1211            FlowError::OrderingCycle(steps) => write!(
1212                f,
1213                "ordering cycle across `after`/`before` edges: {} (no step on it can ever start)",
1214                steps.join(" -> ")
1215            ),
1216        }
1217    }
1218}
1219
1220/// All problems found while compiling a [`Flow`]; non-empty on failure.
1221#[derive(Debug, Clone, PartialEq, Eq)]
1222pub struct FlowErrors(pub Vec<FlowError>);
1223
1224impl std::fmt::Display for FlowErrors {
1225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1226        writeln!(f, "flow failed to compile ({} error(s)):", self.0.len())?;
1227        for e in &self.0 {
1228            writeln!(f, "  - {e}")?;
1229        }
1230        Ok(())
1231    }
1232}
1233
1234impl std::error::Error for FlowErrors {}
1235
1236/// The precomputed tool-gating surface of a compiled flow: every tool name the
1237/// flow reasons about, so introspection can enumerate and explain decisions.
1238#[derive(Debug, Clone, Default)]
1239pub struct ToolPolicy {
1240    /// Every tool referenced anywhere in the flow.
1241    pub tools: BTreeSet<String>,
1242}
1243
1244/// A validated [`Flow`] plus its precomputed [`ToolPolicy`].
1245///
1246/// Produced by [`Flow::compile`]. Holding one is proof the flow passed
1247/// compilation, so the runtime never re-discovers structural errors. This is the
1248/// IR the conversation compiler targets and the type richer surfaces build on.
1249#[derive(Debug, Clone)]
1250pub struct CompiledFlow {
1251    flow: Flow,
1252    policy: ToolPolicy,
1253}
1254
1255impl CompiledFlow {
1256    /// The underlying validated flow.
1257    pub fn flow(&self) -> &Flow {
1258        &self.flow
1259    }
1260    /// The precomputed tool policy.
1261    pub fn tool_policy(&self) -> &ToolPolicy {
1262        &self.policy
1263    }
1264    /// Render the flow as a Mermaid diagram.
1265    pub fn to_mermaid(&self) -> String {
1266        self.flow.to_mermaid()
1267    }
1268    /// Consume into the inner flow.
1269    pub fn into_flow(self) -> Flow {
1270        self.flow
1271    }
1272}
1273
1274/// A model-readable explanation of the current control-plane state — the
1275/// foundation of `why did the assistant ask that?`.
1276///
1277/// Produced by [`FlowMonitor::explain`]. `Serialize` so it can be surfaced to a
1278/// model, a devtool, or a log without the model driving control flow.
1279#[derive(Debug, Clone, Serialize)]
1280pub struct FlowExplanation {
1281    /// Steps eligible-but-not-done right now.
1282    pub active: Vec<String>,
1283    /// Tools currently admitted.
1284    pub allowed_tools: Vec<String>,
1285    /// Tools currently blocked, mapped to the reason.
1286    pub blocked_tools: BTreeMap<String, String>,
1287    /// Required steps not yet done (drives repair).
1288    pub missing_requirements: Vec<String>,
1289}
1290
1291/// Builder for a [`Flow`] using the cemented verbs.
1292#[derive(Default)]
1293pub struct FlowBuilder {
1294    steps: Vec<Step>,
1295    constraints: Vec<Constraint>,
1296    confirm_tools: Vec<String>,
1297}
1298
1299impl FlowBuilder {
1300    fn current(&mut self) -> &mut Step {
1301        self.steps
1302            .last_mut()
1303            .expect("call `.step(id)` before configuring a step")
1304    }
1305
1306    /// Declare a new step.
1307    pub fn step(mut self, id: impl Into<String>) -> Self {
1308        self.steps.push(Step {
1309            id: id.into(),
1310            after: Vec::new(),
1311            gate: None,
1312            done: None,
1313            posture: None,
1314            ground: None,
1315            allow: Vec::new(),
1316            deny: Vec::new(),
1317            terminal: false,
1318        });
1319        self
1320    }
1321    /// Add a dependency (call multiple times for multiple deps).
1322    pub fn after(mut self, dep: impl Into<String>) -> Self {
1323        self.current().after.push(dep.into());
1324        self
1325    }
1326    /// Extra eligibility guard beyond dependencies.
1327    pub fn gate(mut self, g: Guard) -> Self {
1328        self.current().gate = Some(g);
1329        self
1330    }
1331    /// Completion condition.
1332    pub fn done(mut self, g: Guard) -> Self {
1333        self.current().done = Some(g);
1334        self
1335    }
1336    /// Instruction imposed while active.
1337    pub fn posture(mut self, text: impl Into<String>) -> Self {
1338        self.current().posture = Some(text.into());
1339        self
1340    }
1341    /// A grounding template projected while active — a curated, `State`-
1342    /// interpolated fact line that pins the model to known values. `{key}`
1343    /// interpolates a value; `{key?yes:no}` picks by truthiness. See
1344    /// [`render_ground`].
1345    pub fn ground(mut self, template: impl Into<String>) -> Self {
1346        self.current().ground = Some(template.into());
1347        self
1348    }
1349    /// Tools available while active (whitelist).
1350    pub fn allow<I, S>(mut self, tools: I) -> Self
1351    where
1352        I: IntoIterator<Item = S>,
1353        S: Into<String>,
1354    {
1355        self.current()
1356            .allow
1357            .extend(tools.into_iter().map(Into::into));
1358        self
1359    }
1360    /// Tools forbidden while active.
1361    pub fn deny<I, S>(mut self, tools: I) -> Self
1362    where
1363        I: IntoIterator<Item = S>,
1364        S: Into<String>,
1365    {
1366        self.current()
1367            .deny
1368            .extend(tools.into_iter().map(Into::into));
1369        self
1370    }
1371    /// Mark the current step terminal.
1372    pub fn terminal(mut self) -> Self {
1373        self.current().terminal = true;
1374        self
1375    }
1376
1377    /// A tool may run at most once.
1378    pub fn once(mut self, tool: impl Into<String>) -> Self {
1379        self.constraints.push(Constraint::Once(tool.into()));
1380        self
1381    }
1382    /// Ordering invariant: `a` before `b`.
1383    pub fn before(mut self, a: impl Into<String>, b: impl Into<String>) -> Self {
1384        self.constraints
1385            .push(Constraint::Before(a.into(), b.into()));
1386        self
1387    }
1388    /// Required terminal steps for completion.
1389    pub fn require<I, S>(mut self, steps: I) -> Self
1390    where
1391        I: IntoIterator<Item = S>,
1392        S: Into<String>,
1393    {
1394        self.constraints.push(Constraint::Require(
1395            steps.into_iter().map(Into::into).collect(),
1396        ));
1397        self
1398    }
1399    /// Forbid a tool until a guard holds (`never(tool).until(guard)`).
1400    pub fn never(self, tool: impl Into<String>) -> NeverBuilder {
1401        NeverBuilder {
1402            fb: self,
1403            tool: tool.into(),
1404        }
1405    }
1406    /// Commit-tool sugar: at most once, gated until `until`, and flagged for
1407    /// confirmation. Composes `once` + `never…until` + the confirmation seam.
1408    pub fn commit(mut self, tool: impl Into<String>, until: Guard) -> Self {
1409        let tool = tool.into();
1410        self.constraints.push(Constraint::Once(tool.clone()));
1411        self.constraints.push(Constraint::NeverUntil {
1412            tool: tool.clone(),
1413            until,
1414        });
1415        self.confirm_tools.push(tool);
1416        self
1417    }
1418
1419    /// Finalize and validate the flow.
1420    pub fn build(self) -> Result<Flow, Vec<String>> {
1421        let flow = Flow {
1422            steps: self.steps,
1423            constraints: self.constraints,
1424            confirm_tools: self.confirm_tools,
1425        };
1426        flow.validate()?;
1427        Ok(flow)
1428    }
1429}
1430
1431/// Sub-builder for `never(tool).until(guard)`.
1432pub struct NeverBuilder {
1433    fb: FlowBuilder,
1434    tool: String,
1435}
1436
1437impl NeverBuilder {
1438    /// Permit the tool once the guard holds.
1439    pub fn until(mut self, guard: Guard) -> FlowBuilder {
1440        self.fb.constraints.push(Constraint::NeverUntil {
1441            tool: self.tool,
1442            until: guard,
1443        });
1444        self.fb
1445    }
1446}
1447
1448#[cfg(test)]
1449mod tests {
1450    use super::*;
1451    use serde_json::json;
1452
1453    fn debt_flow() -> Flow {
1454        Flow::new()
1455            .step("verify")
1456            .posture("Verify the caller's identity.")
1457            .allow(["lookup_account"])
1458            .done(Guard::is_true("identity_verified"))
1459            .step("disclose")
1460            .after("verify")
1461            .posture("Give the disclosure.")
1462            .done(Guard::is_true("disclosure_given"))
1463            .step("capture_ptp")
1464            .after("disclose")
1465            .done(Guard::captured(["ptp_amount", "ptp_date"]))
1466            .step("take_payment")
1467            .after("capture_ptp")
1468            .allow(["charge_card"])
1469            .done(Guard::called_ok("charge_card"))
1470            .step("close")
1471            .after("capture_ptp")
1472            .terminal()
1473            .never("charge_card")
1474            .until(Guard::is_true("ptp_confirmed"))
1475            .once("charge_card")
1476            .require(["close"])
1477            .build()
1478            .expect("valid flow")
1479    }
1480
1481    #[test]
1482    fn validates_and_detects_unknown_dep() {
1483        let bad = Flow::new()
1484            .step("a")
1485            .done(Guard::is_true("x"))
1486            .step("b")
1487            .after("missing")
1488            .terminal()
1489            .build();
1490        assert!(bad.is_err());
1491    }
1492
1493    #[test]
1494    fn detects_cycle() {
1495        // a after b, b after a — build() should reject.
1496        let steps = vec![
1497            Step {
1498                id: "a".into(),
1499                after: vec!["b".into()],
1500                gate: None,
1501                done: Some(Guard::always()),
1502                posture: None,
1503                ground: None,
1504                allow: vec![],
1505                deny: vec![],
1506                terminal: false,
1507            },
1508            Step {
1509                id: "b".into(),
1510                after: vec!["a".into()],
1511                gate: None,
1512                done: Some(Guard::always()),
1513                posture: None,
1514                ground: None,
1515                allow: vec![],
1516                deny: vec![],
1517                terminal: false,
1518            },
1519        ];
1520        let flow = Flow {
1521            steps,
1522            ..Flow::default()
1523        };
1524        assert!(flow.validate().is_err());
1525    }
1526
1527    #[test]
1528    fn marking_latches_in_order() {
1529        let flow = debt_flow();
1530        let mut mon = FlowMonitor::new(flow, Enforcement::Enforce);
1531        let state = State::new();
1532
1533        // Nothing done; only `verify` is active.
1534        assert_eq!(
1535            mon.active_steps(&state)
1536                .iter()
1537                .map(|s| s.id.as_str())
1538                .collect::<Vec<_>>(),
1539            vec!["verify"]
1540        );
1541
1542        let _ = state.set("identity_verified", true);
1543        mon.on_turn(&state);
1544        assert!(mon.marking().done.contains("verify"));
1545        assert_eq!(mon.verdict("verify", &state), Verdict::Done);
1546        assert_eq!(mon.verdict("disclose", &state), Verdict::Active);
1547
1548        let _ = state.set("disclosure_given", true);
1549        let _ = state.set("ptp_amount", 200);
1550        let _ = state.set("ptp_date", "2026-06-05");
1551        mon.on_turn(&state);
1552        // disclose + capture_ptp latch; close is terminal+eligible -> done.
1553        assert!(mon.marking().done.contains("capture_ptp"));
1554        assert!(mon.marking().done.contains("close"));
1555        assert!(mon.is_complete());
1556    }
1557
1558    #[test]
1559    fn enforces_never_until_and_once() {
1560        let flow = debt_flow();
1561        let mut mon = FlowMonitor::new(flow, Enforcement::Enforce);
1562        let state = State::new();
1563        // get to take_payment being active
1564        let _ = state.set("identity_verified", true);
1565        let _ = state.set("disclosure_given", true);
1566        let _ = state.set("ptp_amount", 200);
1567        let _ = state.set("ptp_date", "x");
1568        mon.on_turn(&state);
1569
1570        // charge_card blocked until ptp_confirmed.
1571        assert!(mon.admits_tool("charge_card", &state).is_err());
1572        let _ = state.set("ptp_confirmed", true);
1573        assert!(mon.admits_tool("charge_card", &state).is_ok());
1574
1575        // after it succeeds once, `once` blocks a second call.
1576        mon.on_tool_ok("charge_card", &state);
1577        assert!(mon.admits_tool("charge_card", &state).is_err());
1578    }
1579
1580    #[test]
1581    fn whitelist_scopes_tools_to_active_step() {
1582        let flow = debt_flow();
1583        let mon = FlowMonitor::new(flow, Enforcement::Enforce);
1584        let state = State::new();
1585        // In `verify`, only lookup_account is allowed.
1586        assert!(mon.admits_tool("lookup_account", &state).is_ok());
1587        assert!(mon.admits_tool("charge_card", &state).is_err());
1588    }
1589
1590    #[test]
1591    fn observe_mode_records_violations_not_blocks() {
1592        let flow = debt_flow();
1593        let mut mon = FlowMonitor::new(flow, Enforcement::Observe);
1594        let state = State::new();
1595        // charge_card out of order in observe mode -> recorded, still "runs".
1596        mon.observe_tool("charge_card", true, &state);
1597        assert_eq!(mon.violations().len(), 1);
1598        assert_eq!(mon.violations()[0].subject, "charge_card");
1599    }
1600
1601    #[test]
1602    fn compile_accepts_valid_flow_and_collects_tool_universe() {
1603        let compiled = debt_flow().compile().expect("valid flow compiles");
1604        // Tool universe spans allow/deny/once/never_until/confirm.
1605        assert!(compiled.tool_policy().tools.contains("charge_card"));
1606        assert!(compiled.tool_policy().tools.contains("lookup_account"));
1607        let _ = FlowMonitor::compiled(compiled, Enforcement::Enforce);
1608    }
1609
1610    #[test]
1611    fn compile_rejects_unreachable_step() {
1612        // `orphan` has no `after` and nothing leads to it — but it IS a root, so
1613        // to make it unreachable we give it an `after` on a step, then never make
1614        // that path lead anywhere. Simplest: a step depending on a missing root is
1615        // caught by validate; here we test a step unreachable via a broken chain.
1616        let flow = Flow::new()
1617            .step("a")
1618            .done(Guard::is_true("a_done"))
1619            .step("b")
1620            .after("a")
1621            .done(Guard::is_true("b_done"))
1622            .step("island")
1623            .after("b")
1624            .gate(Guard::is_true("never"))
1625            .terminal()
1626            .build()
1627            .expect("structurally valid");
1628        // island is reachable via a->b->island, so this compiles; assert it does.
1629        assert!(flow.compile().is_ok());
1630    }
1631
1632    #[test]
1633    fn compile_rejects_unguarded_commit_tool() {
1634        // commit tool gated by an always-true guard is effectively unguarded.
1635        let flow = Flow::new()
1636            .step("s")
1637            .allow(["pay"])
1638            .done(Guard::called_ok("pay"))
1639            .terminal()
1640            .commit("pay", Guard::always())
1641            .build()
1642            .expect("structurally valid");
1643        let err = flow
1644            .compile()
1645            .expect_err("unguarded commit must fail to compile");
1646        assert!(err
1647            .0
1648            .iter()
1649            .any(|e| matches!(e, FlowError::UnguardedCommitTool(t) if t == "pay")));
1650    }
1651
1652    #[test]
1653    fn compile_with_tools_accepts_a_covering_registry() {
1654        let compiled = debt_flow()
1655            .compile_with_tools(&["lookup_account", "charge_card", "unrelated_extra"])
1656            .expect("registry covers the flow's tool universe");
1657        assert!(compiled.tool_policy().tools.contains("charge_card"));
1658    }
1659
1660    #[test]
1661    fn compile_with_tools_reports_dangling_tool_names() {
1662        // `charge_card` is referenced (allow/once/never_until) but missing from
1663        // the registry — a typo/drift the compiler must catch.
1664        let err = debt_flow()
1665            .compile_with_tools(&["lookup_account"])
1666            .expect_err("dangling tool must fail to compile");
1667        assert!(err
1668            .0
1669            .iter()
1670            .any(|e| matches!(e, FlowError::UnknownTool(t) if t == "charge_card")));
1671        // Plain compile() stays registry-agnostic.
1672        assert!(debt_flow().compile().is_ok());
1673    }
1674
1675    #[test]
1676    fn compile_rejects_never_until_guard_on_unknown_step() {
1677        // `never(pay).until(done("missing"))` can never latch — `pay` would be
1678        // forbidden forever. validate() doesn't check constraint guards; compile must.
1679        let flow = Flow::new()
1680            .step("s")
1681            .allow(["pay"])
1682            .done(Guard::called_ok("pay"))
1683            .never("pay")
1684            .until(Guard::done("missing"))
1685            .build()
1686            .expect("structurally valid for build()");
1687        let err = flow.compile().expect_err("unsatisfiable guard must fail");
1688        assert!(err.0.iter().any(|e| matches!(
1689            e,
1690            FlowError::UnsatisfiableGuard { tool, step } if tool == "pay" && step == "missing"
1691        )));
1692    }
1693
1694    #[test]
1695    fn compile_rejects_before_cycle() {
1696        // `after` edges are acyclic, but before(a, b) + before(b, a) closes an
1697        // ordering cycle: neither step can ever become eligible. validate()'s
1698        // cycle check only walks `after`, so compile must catch this.
1699        let flow = Flow::new()
1700            .step("a")
1701            .done(Guard::is_true("a_done"))
1702            .step("b")
1703            .done(Guard::is_true("b_done"))
1704            .before("a", "b")
1705            .before("b", "a")
1706            .build()
1707            .expect("build() only checks `after` cycles");
1708        let err = flow
1709            .compile()
1710            .expect_err("before-cycle must fail to compile");
1711        assert!(err.0.iter().any(|e| matches!(
1712            e,
1713            FlowError::OrderingCycle(steps)
1714                if steps.contains(&"a".to_string()) && steps.contains(&"b".to_string())
1715        )));
1716    }
1717
1718    #[test]
1719    fn explain_reports_blocked_tools_and_reasons() {
1720        let flow = debt_flow();
1721        let mon = FlowMonitor::new(flow, Enforcement::Enforce);
1722        let state = State::new();
1723        let ex = mon.explain(&state);
1724        // In the initial `verify` step, charge_card is blocked; explain says so.
1725        assert!(ex.blocked_tools.contains_key("charge_card"));
1726        assert!(ex.active.contains(&"verify".to_string()));
1727        // why_blocked is the same view.
1728        assert_eq!(mon.why_blocked(&state).blocked_tools, ex.blocked_tools);
1729    }
1730
1731    #[test]
1732    fn before_constraint_gates_step_eligibility() {
1733        // Regression: `before(a, b)` was validated but never enforced — `b` could
1734        // start before `a` was done. `a` and `b` have no `after` edge, so only the
1735        // Before constraint orders them.
1736        let flow = Flow::new()
1737            .step("a")
1738            .done(Guard::is_true("a_done"))
1739            .step("b")
1740            .done(Guard::is_true("b_done"))
1741            .before("a", "b")
1742            .build()
1743            .expect("valid flow");
1744        let mut mon = FlowMonitor::new(flow, Enforcement::Enforce);
1745        let state = State::new();
1746
1747        // `b` is NOT active until `a` is done, even though its own gate is open.
1748        let active: Vec<String> = mon
1749            .active_steps(&state)
1750            .iter()
1751            .map(|s| s.id.clone())
1752            .collect();
1753        assert!(active.contains(&"a".to_string()));
1754        assert!(
1755            !active.contains(&"b".to_string()),
1756            "b must wait for a (Before)"
1757        );
1758
1759        let _ = state.set("a_done", true);
1760        mon.on_turn(&state);
1761        let active: Vec<String> = mon
1762            .active_steps(&state)
1763            .iter()
1764            .map(|s| s.id.clone())
1765            .collect();
1766        assert!(active.contains(&"b".to_string()), "b active once a is done");
1767    }
1768
1769    #[test]
1770    fn custom_guard_in_combinator_is_not_erased() {
1771        // Regression: a custom guard nested in all()/any() was lowered to
1772        // Pred::Always, silently deleting it. It must still evaluate.
1773        let always_false = Guard::all([Guard::is_true("present"), Guard::custom(|_| false)]);
1774        // Mixed combinator is a Custom guard (non-serializable), not a Spec.
1775        assert!(matches!(always_false, Guard::Custom(_)));
1776
1777        let state = State::new();
1778        let _ = state.set("present", true);
1779        let marking = Marking::default();
1780        let ctx = FlowCtx {
1781            state: &state,
1782            marking: &marking,
1783        };
1784        // Would be `true` if the custom guard had been erased to Always.
1785        assert!(!always_false.eval(&ctx), "custom guard must still veto");
1786
1787        // all-spec combinator stays serializable.
1788        let serializable = Guard::all([Guard::is_true("a"), Guard::is_set("b")]);
1789        assert!(matches!(serializable, Guard::Spec(_)));
1790    }
1791
1792    #[test]
1793    fn serde_round_trips_data_driven_flow() {
1794        let flow = debt_flow();
1795        let jsonv = serde_json::to_value(&flow).expect("serialize");
1796        let back: Flow = serde_json::from_value(jsonv).expect("deserialize");
1797        back.validate().expect("round-tripped flow is valid");
1798        assert_eq!(back.steps.len(), flow.steps.len());
1799    }
1800
1801    #[test]
1802    fn custom_guard_is_not_serializable() {
1803        let flow = Flow::new()
1804            .step("a")
1805            .done(Guard::custom(|ctx| ctx.state.contains("ready")))
1806            .terminal()
1807            .build()
1808            .unwrap();
1809        assert!(serde_json::to_value(&flow).is_err());
1810    }
1811
1812    #[test]
1813    fn mermaid_export_has_nodes_and_edges() {
1814        let m = debt_flow().to_mermaid();
1815        assert!(m.contains("flowchart TD"));
1816        assert!(m.contains("verify --> disclose"));
1817        assert!(m.contains("close([close])")); // terminal shape
1818    }
1819
1820    struct WriteAgent;
1821    #[async_trait::async_trait]
1822    impl TextAgent for WriteAgent {
1823        fn name(&self) -> &str {
1824            "writer"
1825        }
1826        async fn run(&self, _state: &State) -> Result<String, crate::error::AgentError> {
1827            Ok("available".to_string())
1828        }
1829    }
1830
1831    #[tokio::test]
1832    async fn on_enter_fires_once_when_step_activates() {
1833        // collect -> check ; `check` runs an agent on enter whose result
1834        // (`check:result`) then completes a downstream `book` step.
1835        let flow = Flow::new()
1836            .step("collect")
1837            .done(Guard::is_true("collected"))
1838            .step("check")
1839            .after("collect")
1840            .done(Guard::resolved("check"))
1841            .step("book")
1842            .after("check")
1843            .terminal()
1844            .require(["book"])
1845            .build()
1846            .expect("valid flow");
1847
1848        let mut mon = FlowMonitor::new(flow, Enforcement::Enforce)
1849            .on_enter("check", run(Arc::new(WriteAgent), AgentMode::Call));
1850        let state = State::new();
1851
1852        // Only `collect` is active at the start.
1853        assert_eq!(mon.take_newly_active(&state), vec!["collect".to_string()]);
1854        // Re-asking yields nothing — fire-once.
1855        assert!(mon.take_newly_active(&state).is_empty());
1856
1857        // Complete `collect`; `check` becomes active and its on_enter fires.
1858        let _ = state.set("collected", true);
1859        mon.on_turn(&state);
1860        mon.fire_enter_actions(&state).await;
1861        assert_eq!(
1862            state.get::<String>("check:result").as_deref(),
1863            Some("available")
1864        );
1865
1866        // The resolved result completes `check`, then terminal `book`.
1867        mon.on_turn(&state);
1868        assert!(mon.marking().done.contains("check"));
1869        assert!(mon.is_complete());
1870        // No further newly-active steps to announce.
1871        assert!(mon.take_newly_active(&state).is_empty());
1872    }
1873
1874    #[test]
1875    fn ground_template_interpolates_and_branches() {
1876        let state = State::new();
1877        let _ = state.set("when", "3pm");
1878        let _ = state.set("available", true);
1879        let _ = state.set("prior_visits", 2);
1880        assert_eq!(
1881            render_ground(
1882                "{when} is {available?open:taken}; {prior_visits} prior visits.",
1883                &state
1884            ),
1885            "3pm is open; 2 prior visits."
1886        );
1887        // Falsy branch + absent key renders empty.
1888        let _ = state.set("available", false);
1889        assert_eq!(
1890            render_ground("slot {missing}is {available?free:full}", &state),
1891            "slot is full"
1892        );
1893    }
1894
1895    #[test]
1896    fn active_grounds_projects_only_active_steps() {
1897        let flow = Flow::new()
1898            .step("collect")
1899            .ground("Known time: {when}.")
1900            .done(Guard::is_set("when"))
1901            .step("done")
1902            .after("collect")
1903            .terminal()
1904            .build()
1905            .expect("valid flow");
1906        let mut mon = FlowMonitor::new(flow, Enforcement::Enforce);
1907        let state = State::new();
1908        let _ = state.set("when", "3pm");
1909        assert_eq!(
1910            mon.active_grounds(&state),
1911            vec!["Known time: 3pm.".to_string()]
1912        );
1913        // Once collect completes, its ground no longer projects.
1914        mon.on_turn(&state);
1915        assert!(mon.active_grounds(&state).is_empty());
1916    }
1917
1918    #[test]
1919    fn eq_guard_matches_state_value() {
1920        let g = Guard::eq("status", json!("active"));
1921        let state = State::new();
1922        let marking = Marking::default();
1923        assert!(!g.eval(&FlowCtx {
1924            state: &state,
1925            marking: &marking
1926        }));
1927        let _ = state.set("status", "active");
1928        assert!(g.eval(&FlowCtx {
1929            state: &state,
1930            marking: &marking
1931        }));
1932    }
1933}