1use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use async_trait::async_trait;
30use dashmap::DashMap;
31use once_cell::sync::Lazy;
32use regex::Regex;
33use serde_json::Value;
34
35use crate::live::extractor::{ExtractionTrigger, FieldPromotion, OnComplete, TurnExtractor};
36use crate::live::transcript::TranscriptTurn;
37use crate::llm::LlmError;
38use crate::orchestration::Mode as AgentMode;
39use crate::state::State;
40use crate::text::TextAgent;
41
42static MONEY_RE: Lazy<Regex> =
43 Lazy::new(|| Regex::new(r"\$?\s?(\d{1,3}(?:,\d{3})*|\d+)(?:\.(\d{1,2}))?").unwrap());
44static INT_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"-?\d+").unwrap());
45static DATE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\b\d{4}-\d{2}-\d{2}\b").unwrap());
46static TIME12_RE: Lazy<Regex> =
48 Lazy::new(|| Regex::new(r"(?i)\b(\d{1,2})(?::(\d{2}))?\s*([ap]m)\b").unwrap());
49static TIME24_RE: Lazy<Regex> =
51 Lazy::new(|| Regex::new(r"\b([01]?\d|2[0-3]):([0-5]\d)\b").unwrap());
52
53fn parse_time(text: &str) -> Option<String> {
55 if let Some(c) = TIME12_RE.captures(text) {
56 let mut hour: u32 = c.get(1)?.as_str().parse().ok()?;
57 let min: u32 = c.get(2).map(|m| m.as_str()).unwrap_or("00").parse().ok()?;
58 let pm = c.get(3)?.as_str().eq_ignore_ascii_case("pm");
59 if hour > 12 {
60 return None; }
62 if pm && hour < 12 {
63 hour += 12;
64 } else if !pm && hour == 12 {
65 hour = 0; }
67 return Some(format!("{hour:02}:{min:02}"));
68 }
69 let c = TIME24_RE.captures(text)?;
70 Some(format!(
71 "{:02}:{}",
72 c.get(1)?.as_str().parse::<u32>().ok()?,
73 c.get(2)?.as_str()
74 ))
75}
76
77#[derive(Clone)]
83pub enum Recognizer {
84 Integer {
87 near: Vec<String>,
89 },
90 Money,
92 Regex(Regex),
94 OneOf(Vec<String>),
96 Fuzzy {
99 options: Vec<String>,
101 min: f64,
103 },
104 YesNo,
106 DateTime,
112}
113
114impl Recognizer {
115 pub fn integer() -> Self {
117 Recognizer::Integer { near: Vec::new() }
118 }
119 pub fn integer_near<I, S>(anchors: I) -> Self
121 where
122 I: IntoIterator<Item = S>,
123 S: Into<String>,
124 {
125 Recognizer::Integer {
126 near: anchors.into_iter().map(Into::into).collect(),
127 }
128 }
129 pub fn money() -> Self {
131 Recognizer::Money
132 }
133 pub fn regex(pattern: &str) -> Self {
135 Recognizer::Regex(Regex::new(pattern).expect("invalid recognizer regex"))
136 }
137 pub fn one_of<I, S>(options: I) -> Self
139 where
140 I: IntoIterator<Item = S>,
141 S: Into<String>,
142 {
143 Recognizer::OneOf(options.into_iter().map(Into::into).collect())
144 }
145 pub fn fuzzy<I, S>(options: I) -> Self
147 where
148 I: IntoIterator<Item = S>,
149 S: Into<String>,
150 {
151 Recognizer::Fuzzy {
152 options: options.into_iter().map(Into::into).collect(),
153 min: 0.85,
154 }
155 }
156 pub fn yes_no() -> Self {
158 Recognizer::YesNo
159 }
160 pub fn datetime() -> Self {
162 Recognizer::DateTime
163 }
164
165 pub fn recognize(&self, text: &str) -> Option<(Value, f32)> {
167 let lower = text.to_lowercase();
168 match self {
169 Recognizer::Integer { near } => {
170 if !near.is_empty() && !near.iter().any(|a| lower.contains(&a.to_lowercase())) {
171 return None;
172 }
173 let m = INT_RE.find(text)?;
174 m.as_str()
175 .parse::<i64>()
176 .ok()
177 .map(|n| (Value::from(n), 1.0))
178 }
179 Recognizer::Money => {
180 let caps = MONEY_RE.captures(text)?;
181 let whole = caps.get(1)?.as_str().replace(',', "");
182 let cents = caps.get(2).map(|c| c.as_str()).unwrap_or("0");
183 let amount: f64 = format!("{whole}.{cents:0<2}").parse().ok()?;
184 Some((Value::from(amount), 1.0))
185 }
186 Recognizer::Regex(re) => {
187 let caps = re.captures(text)?;
188 let s = caps.get(1).or_else(|| caps.get(0))?.as_str().to_string();
189 Some((Value::from(s), 1.0))
190 }
191 Recognizer::OneOf(options) => options
192 .iter()
193 .find(|o| lower.contains(&o.to_lowercase()))
194 .map(|o| (Value::from(o.clone()), 1.0)),
195 Recognizer::Fuzzy { options, min } => {
196 let mut best: Option<(&String, f64)> = None;
197 for opt in options {
198 let ol = opt.to_lowercase();
199 let sim = std::iter::once(lower.as_str())
201 .chain(lower.split_whitespace())
202 .map(|w| strsim::jaro_winkler(&ol, w))
203 .fold(0.0_f64, f64::max);
204 if sim >= *min && best.map(|(_, b)| sim > b).unwrap_or(true) {
205 best = Some((opt, sim));
206 }
207 }
208 best.map(|(opt, sim)| (Value::from(opt.clone()), sim as f32))
209 }
210 Recognizer::YesNo => {
211 const NO: &[&str] = &[
215 "no",
216 "nope",
217 "nah",
218 "not",
219 "don't",
220 "dont",
221 "doesn't",
222 "isn't",
223 "won't",
224 "never",
225 "incorrect",
226 "wrong",
227 "negative",
228 ];
229 const YES: &[&str] = &[
230 "yes",
231 "yeah",
232 "yep",
233 "yup",
234 "sure",
235 "correct",
236 "confirm",
237 "confirmed",
238 "ok",
239 "okay",
240 "right",
241 "affirmative",
242 ];
243 let tokens: Vec<&str> = lower
244 .split(|c: char| !(c.is_alphanumeric() || c == '\''))
245 .filter(|t| !t.is_empty())
246 .collect();
247 if tokens.iter().any(|t| NO.contains(t)) {
248 Some((Value::Bool(false), 0.9))
249 } else if tokens.iter().any(|t| YES.contains(t)) {
250 Some((Value::Bool(true), 0.9))
251 } else {
252 None
253 }
254 }
255 Recognizer::DateTime => {
256 const WEEKDAYS: &[&str] = &[
257 "monday",
258 "tuesday",
259 "wednesday",
260 "thursday",
261 "friday",
262 "saturday",
263 "sunday",
264 ];
265 let mut obj = serde_json::Map::new();
266 if let Some(m) = DATE_RE.find(text) {
267 obj.insert("date".into(), Value::from(m.as_str().to_string()));
268 }
269 if let Some(t) = parse_time(&lower) {
270 obj.insert("time".into(), Value::from(t));
271 }
272 if let Some(d) = ["today", "tomorrow", "tonight", "yesterday"]
273 .into_iter()
274 .find(|d| lower.contains(d))
275 {
276 obj.insert("day".into(), Value::from(d));
277 }
278 if let Some(w) = WEEKDAYS.iter().find(|w| lower.contains(*w)) {
279 obj.insert("weekday".into(), Value::from(*w));
280 }
281 if let Some(p) = ["morning", "afternoon", "evening", "noon", "midnight"]
282 .into_iter()
283 .find(|p| lower.contains(p))
284 {
285 obj.insert("part".into(), Value::from(p));
286 }
287 if obj.is_empty() {
288 None
289 } else {
290 Some((Value::Object(obj), 1.0))
291 }
292 }
293 }
294 }
295}
296
297type FieldFetchFn =
300 Arc<dyn Fn(Value) -> Pin<Box<dyn Future<Output = Result<Value, String>> + Send>> + Send + Sync>;
301
302#[derive(Clone)]
304enum Source {
305 Recognize(Recognizer),
307 Resolve {
309 args: Vec<String>,
311 ttl: Option<Duration>,
313 fetch: FieldFetchFn,
315 },
316}
317
318type FieldValidator = Arc<dyn Fn(&Value) -> bool + Send + Sync>;
321
322#[derive(Clone)]
324pub struct Field {
325 name: String,
326 source: Source,
327 state_key: String,
328 overwrite: bool,
329 validate: Option<FieldValidator>,
331}
332
333#[derive(Clone)]
336pub struct Extract {
337 name: String,
338 fields: Vec<Field>,
339 window: usize,
340 trigger: ExtractionTrigger,
341 on_complete: Option<OnComplete>,
342}
343
344impl Extract {
345 pub fn record(name: impl Into<String>) -> ExtractBuilder {
347 ExtractBuilder {
348 name: name.into(),
349 fields: Vec::new(),
350 window: 3,
351 trigger: ExtractionTrigger::EveryTurn,
352 on_complete: None,
353 }
354 }
355
356 pub fn into_extractor(self) -> Arc<dyn TurnExtractor> {
358 Arc::new(RecordExtractor::new(self))
359 }
360
361 pub fn field_state_keys(&self) -> Vec<(String, String)> {
365 self.fields
366 .iter()
367 .map(|f| (f.name.clone(), f.state_key.clone()))
368 .collect()
369 }
370}
371
372pub struct ExtractBuilder {
374 name: String,
375 fields: Vec<Field>,
376 window: usize,
377 trigger: ExtractionTrigger,
378 on_complete: Option<OnComplete>,
379}
380
381impl ExtractBuilder {
382 pub fn field(mut self, name: impl Into<String>, recognizer: Recognizer) -> Self {
384 let name = name.into();
385 self.fields.push(Field {
386 state_key: name.clone(),
387 name,
388 source: Source::Recognize(recognizer),
389 overwrite: false,
390 validate: None,
391 });
392 self
393 }
394 pub fn field_to(
396 mut self,
397 name: impl Into<String>,
398 state_key: impl Into<String>,
399 recognizer: Recognizer,
400 ) -> Self {
401 self.fields.push(Field {
402 name: name.into(),
403 state_key: state_key.into(),
404 source: Source::Recognize(recognizer),
405 overwrite: false,
406 validate: None,
407 });
408 self
409 }
410 pub fn field_resolve<I, S, F, Fut>(
415 mut self,
416 name: impl Into<String>,
417 args: I,
418 ttl: Option<Duration>,
419 fetch: F,
420 ) -> Self
421 where
422 I: IntoIterator<Item = S>,
423 S: Into<String>,
424 F: Fn(Value) -> Fut + Send + Sync + 'static,
425 Fut: Future<Output = Result<Value, String>> + Send + 'static,
426 {
427 let name = name.into();
428 let fetch = Arc::new(fetch);
429 self.fields.push(Field {
430 state_key: name.clone(),
431 name,
432 source: Source::Resolve {
433 args: args.into_iter().map(Into::into).collect(),
434 ttl,
435 fetch: Arc::new(move |a| {
436 let fetch = fetch.clone();
437 Box::pin(async move { fetch(a).await })
438 }),
439 },
440 overwrite: false,
441 validate: None,
442 });
443 self
444 }
445 pub fn validate<F>(mut self, predicate: F) -> Self
449 where
450 F: Fn(&Value) -> bool + Send + Sync + 'static,
451 {
452 if let Some(field) = self.fields.last_mut() {
453 field.validate = Some(Arc::new(predicate));
454 }
455 self
456 }
457
458 pub fn window(mut self, n: usize) -> Self {
460 self.window = n;
461 self
462 }
463 pub fn trigger(mut self, trigger: ExtractionTrigger) -> Self {
465 self.trigger = trigger;
466 self
467 }
468 pub fn on_complete(mut self, agent: Arc<dyn TextAgent>, mode: AgentMode) -> Self {
471 self.on_complete = Some(OnComplete { agent, mode });
472 self
473 }
474 pub fn build(self) -> Extract {
476 Extract {
477 name: self.name,
478 fields: self.fields,
479 window: self.window,
480 trigger: self.trigger,
481 on_complete: self.on_complete,
482 }
483 }
484}
485
486pub struct RecordExtractor {
489 spec: Extract,
490 promotions: Vec<FieldPromotion>,
491 cache: Arc<DashMap<String, (Value, Instant)>>,
493}
494
495impl RecordExtractor {
496 pub fn new(spec: Extract) -> Self {
498 let promotions = spec
499 .fields
500 .iter()
501 .map(|f| {
502 let p = if f.overwrite {
503 FieldPromotion::overwrite(&f.name)
504 } else {
505 FieldPromotion::keep_known(&f.name)
506 };
507 p.to(&f.state_key)
508 })
509 .collect();
510 Self {
511 spec,
512 promotions,
513 cache: Arc::new(DashMap::new()),
514 }
515 }
516
517 async fn resolve_field(
523 &self,
524 field: &str,
525 args: &[String],
526 ttl: Option<Duration>,
527 fetch: &FieldFetchFn,
528 fresh: &serde_json::Map<String, Value>,
529 state: &State,
530 ) -> Option<Value> {
531 let mut obj = serde_json::Map::new();
533 for key in args {
534 if let Some(v) = fresh.get(key).cloned().or_else(|| state.get::<Value>(key)) {
535 obj.insert(key.clone(), v);
536 }
537 }
538 let args_value = Value::Object(obj);
539 let cache_key = format!("{field}|{args_value}");
540 if let Some(ttl) = ttl {
541 if let Some(entry) = self.cache.get(&cache_key) {
542 if entry.1.elapsed() < ttl {
543 return Some(entry.0.clone());
544 }
545 }
546 }
547 match fetch(args_value).await {
548 Ok(value) => {
549 if ttl.is_some() {
550 self.cache
551 .insert(cache_key, (value.clone(), Instant::now()));
552 }
553 Some(value)
554 }
555 Err(_e) => {
556 #[cfg(feature = "tracing-support")]
557 tracing::warn!(field, "resolver failed: {_e}");
558 None
559 }
560 }
561 }
562}
563
564#[async_trait]
565impl TurnExtractor for RecordExtractor {
566 fn name(&self) -> &str {
567 &self.spec.name
568 }
569
570 fn window_size(&self) -> usize {
571 self.spec.window
572 }
573
574 fn trigger(&self) -> ExtractionTrigger {
575 self.spec.trigger.clone()
576 }
577
578 fn promotion_rules(&self) -> &[FieldPromotion] {
579 &self.promotions
580 }
581
582 fn on_complete(&self) -> Option<OnComplete> {
583 self.spec.on_complete.clone()
584 }
585
586 async fn extract(&self, window: &[TranscriptTurn]) -> Result<Value, LlmError> {
587 let text = window
589 .iter()
590 .map(|t| t.user.as_str())
591 .collect::<Vec<_>>()
592 .join(" ");
593 let mut obj = serde_json::Map::new();
594 for field in &self.spec.fields {
595 if let Source::Recognize(rec) = &field.source {
596 if let Some((value, _confidence)) = rec.recognize(&text) {
597 if field.validate.as_ref().is_some_and(|v| !v(&value)) {
598 continue; }
600 obj.insert(field.name.clone(), value);
601 }
602 }
603 }
604 Ok(Value::Object(obj))
605 }
606
607 async fn extract_with_state(
608 &self,
609 window: &[TranscriptTurn],
610 state: &State,
611 ) -> Result<Value, LlmError> {
612 let text = window
613 .iter()
614 .map(|t| t.user.as_str())
615 .collect::<Vec<_>>()
616 .join(" ");
617 let mut obj = serde_json::Map::new();
618 let mut fresh = serde_json::Map::new();
622 for field in &self.spec.fields {
623 if let Source::Recognize(rec) = &field.source {
624 if let Some((value, confidence)) = rec.recognize(&text) {
625 if field.validate.as_ref().is_some_and(|v| !v(&value)) {
626 continue; }
628 let _ = state.set(
631 format!("state_meta:{}", field.state_key),
632 serde_json::json!({ "source": "extraction", "confidence": confidence }),
633 );
634 fresh.insert(field.state_key.clone(), value.clone());
635 obj.insert(field.name.clone(), value);
636 }
637 }
638 }
639 let resolves = self
641 .spec
642 .fields
643 .iter()
644 .filter_map(|field| match &field.source {
645 Source::Resolve { args, ttl, fetch } => Some(async {
646 self.resolve_field(&field.name, args, *ttl, fetch, &fresh, state)
647 .await
648 .map(|v| (field.name.clone(), v))
649 }),
650 Source::Recognize(_) => None,
651 });
652 for resolved in futures::future::join_all(resolves)
653 .await
654 .into_iter()
655 .flatten()
656 {
657 obj.insert(resolved.0, resolved.1);
658 }
659 Ok(Value::Object(obj))
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use serde_json::json;
667
668 fn turn(user: &str) -> TranscriptTurn {
669 TranscriptTurn {
670 turn_number: 0,
671 user: user.to_string(),
672 model: String::new(),
673 tool_calls: Vec::new(),
674 timestamp: std::time::Instant::now(),
675 }
676 }
677
678 #[test]
679 fn recognizers_basic() {
680 assert_eq!(
681 Recognizer::integer()
682 .recognize("I want 3 of them")
683 .unwrap()
684 .0,
685 json!(3)
686 );
687 assert_eq!(
688 Recognizer::money()
689 .recognize("that'll be $1,250.50")
690 .unwrap()
691 .0,
692 json!(1250.50)
693 );
694 assert_eq!(
695 Recognizer::one_of(["pizza", "salad"])
696 .recognize("a large PIZZA please")
697 .unwrap()
698 .0,
699 json!("pizza")
700 );
701 assert_eq!(
702 Recognizer::yes_no()
703 .recognize("yes that's right")
704 .unwrap()
705 .0,
706 json!(true)
707 );
708 assert_eq!(
709 Recognizer::yes_no().recognize("no thanks").unwrap().0,
710 json!(false)
711 );
712 assert!(Recognizer::yes_no().recognize("maybe later").is_none());
713 }
714
715 #[test]
716 fn yes_no_negation_wins_and_is_word_aware() {
717 let r = Recognizer::yes_no();
718 assert_eq!(r.recognize("not correct").unwrap().0, json!(false));
720 assert_eq!(r.recognize("don't confirm that").unwrap().0, json!(false));
721 assert_eq!(r.recognize("that's incorrect").unwrap().0, json!(false));
723 assert!(r.recognize("another option").is_none());
725 assert_eq!(r.recognize("yes please").unwrap().0, json!(true));
727 }
728
729 #[tokio::test]
730 async fn resolver_binds_args_recognized_this_turn() {
731 let spec = Extract::record("booking")
734 .field("slot", Recognizer::one_of(["morning", "afternoon"]))
735 .field_resolve("availability", ["slot"], None, |args: Value| async move {
736 Ok(json!({ "slot_seen": args.get("slot").cloned() }))
737 })
738 .build();
739 let ext = RecordExtractor::new(spec);
740 let state = State::new(); let out = ext
742 .extract_with_state(&[turn("afternoon works")], &state)
743 .await
744 .unwrap();
745 assert_eq!(out["slot"], json!("afternoon"));
746 assert_eq!(out["availability"], json!({ "slot_seen": "afternoon" }));
747 }
748
749 #[test]
750 fn datetime_normalizes_clock_and_calendar() {
751 let r = Recognizer::datetime();
752 assert_eq!(
754 r.recognize("can we meet at 3pm").unwrap().0,
755 json!({"time": "15:00"})
756 );
757 assert_eq!(
759 r.recognize("tomorrow at 9:30 am works").unwrap().0,
760 json!({"time": "09:30", "day": "tomorrow"})
761 );
762 assert_eq!(
764 r.recognize("12am sharp").unwrap().0,
765 json!({"time": "00:00"})
766 );
767 assert_eq!(
769 r.recognize("friday afternoon, 2026-06-05 at 15:00")
770 .unwrap()
771 .0,
772 json!({"date": "2026-06-05", "time": "15:00", "weekday": "friday", "part": "afternoon"})
773 );
774 assert!(r.recognize("a table for 4 people").is_none());
776 assert!(r.recognize("at 13pm").is_none());
778 }
779
780 #[test]
781 fn integer_near_anchors() {
782 let r = Recognizer::integer_near(["quantity", "want"]);
783 assert_eq!(r.recognize("I want 5").unwrap().0, json!(5));
784 assert!(r.recognize("call me at 5").is_none()); }
786
787 #[test]
788 fn fuzzy_matches_misheard_name() {
789 let r = Recognizer::fuzzy(["Johnson", "Jackson", "Jensen"]);
790 let (v, conf) = r.recognize("the name is jonson").unwrap();
791 assert_eq!(v, json!("Johnson"));
792 assert!(conf > 0.85);
793 }
794
795 #[tokio::test]
796 async fn record_extractor_captures_fields() {
797 let spec = Extract::record("order")
798 .field("quantity", Recognizer::integer_near(["want", "get"]))
799 .field("item", Recognizer::one_of(["pizza", "salad", "soda"]))
800 .window(2)
801 .build();
802 let extractor = RecordExtractor::new(spec);
803 assert_eq!(extractor.name(), "order");
804 assert_eq!(extractor.window_size(), 2);
805 assert_eq!(extractor.promotion_rules().len(), 2);
806
807 let window = vec![turn("I want 2 large pizza")];
808 let out = extractor.extract(&window).await.unwrap();
809 assert_eq!(out["quantity"], json!(2));
810 assert_eq!(out["item"], json!("pizza"));
811 }
812
813 #[tokio::test]
814 async fn record_resolves_async_field_from_state() {
815 let spec = Extract::record("booking")
816 .field("slot", Recognizer::one_of(["morning", "afternoon"]))
817 .field_resolve("availability", ["slot"], None, |args: Value| async move {
818 let slot = args.get("slot").and_then(|v| v.as_str()).unwrap_or("");
819 Ok(serde_json::json!({ "open": slot == "afternoon" }))
820 })
821 .build();
822 let ext = RecordExtractor::new(spec);
823 let state = State::new();
824 let _ = state.set("slot", "afternoon");
825 let out = ext
826 .extract_with_state(&[turn("afternoon please")], &state)
827 .await
828 .unwrap();
829 assert_eq!(out["slot"], json!("afternoon"));
830 assert_eq!(out["availability"], json!({ "open": true }));
831 }
832
833 #[tokio::test]
834 async fn resolver_field_caches_within_ttl() {
835 use std::sync::atomic::{AtomicUsize, Ordering};
836 let calls = Arc::new(AtomicUsize::new(0));
837 let counter = calls.clone();
838 let spec = Extract::record("b")
839 .field_resolve("v", ["k"], Some(Duration::from_secs(60)), move |_args| {
840 let counter = counter.clone();
841 async move {
842 counter.fetch_add(1, Ordering::SeqCst);
843 Ok(json!("x"))
844 }
845 })
846 .build();
847 let ext = RecordExtractor::new(spec);
848 let state = State::new();
849 let _ = state.set("k", 1);
850 let _ = ext.extract_with_state(&[turn("a")], &state).await.unwrap();
851 let _ = ext.extract_with_state(&[turn("a")], &state).await.unwrap();
852 assert_eq!(calls.load(Ordering::SeqCst), 1);
854 }
855
856 #[tokio::test]
857 async fn on_complete_is_exposed() {
858 use crate::error::AgentError;
859 struct A;
860 #[async_trait]
861 impl TextAgent for A {
862 fn name(&self) -> &str {
863 "a"
864 }
865 async fn run(&self, _s: &State) -> Result<String, AgentError> {
866 Ok("done".into())
867 }
868 }
869 let spec = Extract::record("x")
870 .field("q", Recognizer::integer())
871 .on_complete(Arc::new(A), AgentMode::Dispatch)
872 .build();
873 assert!(RecordExtractor::new(spec).on_complete().is_some());
874 }
875
876 #[tokio::test]
877 async fn record_extractor_omits_unrecognized() {
878 let spec = Extract::record("order")
879 .field("item", Recognizer::one_of(["pizza"]))
880 .build();
881 let out = RecordExtractor::new(spec)
882 .extract(&[turn("hello there")])
883 .await
884 .unwrap();
885 assert!(out.as_object().unwrap().is_empty());
886 }
887}