Skip to content

SumoKernel Reference

sumospace.kernel.SumoKernel

SumoKernel(
    config: KernelConfig | None = None,
    settings: "SumoSettings | None" = None,
    hooks: "HookRegistry | None" = None,
)

The main orchestration engine.

Lifecycle

async with SumoKernel() as kernel: trace = await kernel.run("your task")

Source code in sumospace/kernel.py
def __init__(
    self,
    config: KernelConfig | None = None,
    settings: "SumoSettings | None" = None,
    hooks: "HookRegistry | None" = None,
):
    if config is not None and settings is None:
        import warnings
        warnings.warn(
            "Passing KernelConfig directly is deprecated. "
            "Use SumoSettings instead: SumoKernel(settings=SumoSettings(...)). "
            "KernelConfig support will be removed in v1.0.",
            DeprecationWarning,
            stacklevel=2,
        )
        import dataclasses
        from sumospace.settings import SumoSettings
        data = dataclasses.asdict(config)
        # Map deprecated flags to new settings
        if "require_consensus" in data:
            data["committee_enabled"] = data["require_consensus"]
        if "dry_run" in data:
            data["execution_enabled"] = not data["dry_run"]

        self.settings = SumoSettings(**data)
    elif settings is not None:
        self.settings = settings
    else:
        from sumospace.settings import SumoSettings
        self.settings = SumoSettings()

    self._provider: ProviderRouter | None = None
    self._classifier: SumoClassifier | None = None
    self._committee: Committee | None = None
    self._tools: ToolRegistry | None = None
    self._memory: MemoryManager | None = None
    self._ingestor: UniversalIngestor | None = None
    self._rag: RAGPipeline | None = None
    self._initialized = False

    from sumospace.audit import AuditLogger
    self._audit_logger: AuditLogger | None = AuditLogger(self.settings)

    from sumospace.hooks import HookRegistry
    self.hooks: HookRegistry = hooks or HookRegistry(verbose=self.settings.verbose)

    from sumospace.templates import TemplateManager
    self.templates = TemplateManager(
        template_path=self.settings.prompt_template_path
    )

    # Auto-load hooks from workspace if enabled
    self._auto_load_hooks()

    from sumospace.cache import PlanCache
    self._cache = PlanCache(
        cache_dir=str(Path(self.settings.workspace) / ".sumo_cache")
    )

    self.telemetry = SumoTelemetry(
        enabled=self.settings.telemetry_enabled,
        endpoint=self.settings.telemetry_endpoint
    )

boot async

boot()

Initialise all subsystems. Called automatically by async context manager.

Source code in sumospace/kernel.py
async def boot(self):
    """Initialise all subsystems. Called automatically by async context manager."""
    if self._initialized:
        return

    cfg = self.settings
    try:
        if cfg.verbose:
            console.print(Panel(
                f"[bold cyan]SumoKernel booting[/bold cyan]\n"
                f"Provider: [green]{cfg.provider}[/green]  "
                f"Model: [green]{cfg.model}[/green]  "
                f"Embeddings: [green]{cfg.embedding_provider}[/green]\n"
                f"Workspace: [dim]{cfg.workspace}[/dim]  "
                f"Dry-run: [yellow]{cfg.dry_run}[/yellow]",
                title="SumoSpace",
                border_style="cyan",
            ))

        # 1. Provider
        self._provider = ProviderRouter(
            provider=cfg.provider,
            model=cfg.model if cfg.model != "default" else None,
            load_in_4bit=cfg.hf_load_in_4bit,
        )
        await self._provider.initialize()

        self._secondary_provider = None
        if cfg.secondary_provider:
            self._secondary_provider = ProviderRouter(
                provider=cfg.secondary_provider,
                model=cfg.secondary_model,
                load_in_4bit=cfg.hf_load_in_4bit,
            )
            await self._secondary_provider.initialize()

        # 2. Tool registry
        self._tools = ToolRegistry(workspace=cfg.workspace)

        # 3. Scope resolution
        #    If user_id is set, build a ScopeManager and resolve paths.
        #    Otherwise fall back to raw chroma_base.
        scope_mgr = None
        resolved_chroma = cfg.chroma_base
        if cfg.user_id:
            from sumospace.scope import ScopeManager
            scope_mgr = ScopeManager(
                chroma_base=cfg.chroma_base,
                level=cfg.scope_level,
            )
            resolved_chroma = scope_mgr.resolve(
                user_id=cfg.user_id,
                session_id=cfg.session_id,
                project_id=cfg.project_id,
            )

        # 4. Memory
        if cfg.memory_enabled:
            self._memory = MemoryManager(
                chroma_path=resolved_chroma,
                embedding_provider=cfg.embedding_provider,
                scope_manager=scope_mgr,
                user_id=cfg.user_id,
                session_id=cfg.session_id,
                project_id=cfg.project_id,
            )
            await self._memory.initialize()
        else:
            self._memory = None

        # 5. Ingestor + RAG
        if getattr(cfg, "rag_enabled", True):
            self._ingestor = UniversalIngestor(
                chroma_path=resolved_chroma,
                embedding_provider=cfg.embedding_provider,
                embedding_model=cfg.embedding_model,
                max_chunks=cfg.max_chunks_per_scope,
            )
            await self._ingestor.initialize()

            self._rag = RAGPipeline(ingestor=self._ingestor)
            await self._rag.initialize()
        else:
            self._ingestor = None
            self._rag = None

        # 5. Classifier
        self._classifier = SumoClassifier(provider=self._provider)
        await self._classifier.initialize()

        # 6. Committee
        self._committee = Committee(
            provider=self._provider,
            planning_provider=self._secondary_provider or self._provider,
            require_consensus=cfg.require_consensus,
            templates=self.templates,
        )

        self._initialized = True

        if cfg.verbose:
            console.print("[bold green]✓ Kernel ready[/bold green]")

        await self.hooks.trigger("on_kernel_boot", self)

    except Exception as e:
        raise KernelBootError(f"Kernel boot failed: {e}") from e

chat async

chat(message: str, session_id: str | None = None) -> str

Simple conversational turn (no tool execution).

Source code in sumospace/kernel.py
async def chat(self, message: str, session_id: str | None = None) -> str:
    """Simple conversational turn (no tool execution)."""
    if not self._initialized:
        await self.boot()
    await self._memory.add("user", message)
    recent = self._memory.recent(10)
    history = "\n".join(f"{m['role'].upper()}: {m['content']}" for m in recent[:-1])
    system = "You are Sumo, a helpful AI assistant."
    user = f"{history}\n\nUSER: {message}" if history else message
    response = await self._provider.complete(user=user, system=system, temperature=0.1)
    await self._memory.add("assistant", response)
    return response

ingest async

ingest(path: str, recursive: bool = True) -> int

Ingest a file or directory into the RAG knowledge base.

Source code in sumospace/kernel.py
async def ingest(
    self,
    path: str,
    recursive: bool = True,
) -> int:
    """Ingest a file or directory into the RAG knowledge base."""
    if not self._initialized:
        await self.boot()
    from pathlib import Path
    p = Path(path)
    if p.is_dir():
        results = await self._ingestor.ingest_directory(path)
        return sum(r.chunks_created for r in results)
    else:
        result = await self._ingestor.ingest_file(path)
        return result.chunks_created

ingest_media async

ingest_media(path: str, force: bool = False) -> list[Any]

Ingest text, images, audio, or video. Requires settings.media_enabled = True.

Source code in sumospace/kernel.py
async def ingest_media(self, path: str, force: bool = False) -> list[Any]:
    """
    Ingest text, images, audio, or video.
    Requires settings.media_enabled = True.
    """
    if not self.settings.media_enabled:
        raise ValueError("Media features are disabled. Set media_enabled=True in settings.")
    if not self._initialized:
        await self.boot()

    from sumospace.media_ingest import MediaIngestor
    ingestor = MediaIngestor(self.settings)
    return ingestor.ingest_path(path, force=force)

recall async

recall(query: str, top_k: int = 5)

Direct semantic recall from memory.

Source code in sumospace/kernel.py
async def recall(self, query: str, top_k: int = 5):
    """Direct semantic recall from memory."""
    if not self._initialized:
        await self.boot()
    return await self._memory.recall(query, top_k=top_k)

run async

run(
    task: str, session_id: str | None = None
) -> ExecutionTrace

Execute a task end-to-end synchronously.

Parameters:

Name Type Description Default
task str

Natural language task description.

required
session_id str | None

Optional session identifier for memory scoping.

None

Returns:

Type Description
ExecutionTrace

ExecutionTrace with full audit trail and final answer.

Note

Prefer stream_run() over this method in any UI context. run() blocks until full completion, meaning the user will see no feedback during potentially long-running tool executions or committee deliberation.

Warning

If you catch ConsensusFailedError or ExecutionHaltedError, the returned trace will have success=False and the error attached to trace.error.

Source code in sumospace/kernel.py
async def run(self, task: str, session_id: str | None = None) -> ExecutionTrace:
    """
    Execute a task end-to-end synchronously.

    Args:
        task:       Natural language task description.
        session_id: Optional session identifier for memory scoping.

    Returns:
        ExecutionTrace with full audit trail and final answer.

    Note:
        Prefer `stream_run()` over this method in any UI context. `run()` blocks
        until full completion, meaning the user will see no feedback during
        potentially long-running tool executions or committee deliberation.

    Warning:
        If you catch `ConsensusFailedError` or `ExecutionHaltedError`, the returned 
        trace will have `success=False` and the error attached to `trace.error`.
    """
    if not self._initialized:
        await self.boot()

    session_id = session_id or uuid.uuid4().hex[:12]
    task_hash = hashlib.sha256(task.encode()).hexdigest()[:8]
    start = time.monotonic()
    verdict = None

    async with self.telemetry.async_span(
        "sumospace.kernel.run", 
        attributes={"task": task, "session_id": session_id, "task_hash": task_hash}
    ):
        trace = ExecutionTrace(
            task=task,
            session_id=session_id,
            intent=Intent.GENERAL_QA,
            classification=None,
            plan=None,
        )

    try:
        # Step 1: Classify
        if self.settings.verbose:
            console.print(f"\n[bold]Task:[/bold] {task}")

        await self.hooks.trigger("on_task_start", task, session_id)

        recent_ctx = {}
        if self.settings.memory_enabled:
            recent_ctx["recent_messages"] = [m["content"] for m in self._memory.recent(5)]

        async with self.telemetry.async_span("sumospace.classify", attributes={"task": task}):
            classification = await self._classifier.classify(task, context=recent_ctx)
        trace.intent = classification.intent
        trace.classification = classification

        if self.settings.verbose:
            console.print(
                f"[dim]Intent: [cyan]{classification.intent.value}[/cyan] "
                f"({classification.confidence:.0%}) — {classification.reasoning}[/dim]"
            )

        # Step 2: RAG retrieval (if needed)
        rag_context = ""
        if self.settings.rag_enabled and classification.needs_retrieval:
            async with self.telemetry.async_span("sumospace.rag.retrieve", attributes={"task": task}):
                try:
                    rag_result = await self._rag.retrieve(task)
                    if rag_result.chunks:
                        rag_context = rag_result.context
                        trace.rag_context = rag_context
                        if self.settings.verbose:
                            console.print(
                                f"[dim]Retrieved {len(rag_result.chunks)} chunks "
                                f"(reranked: {rag_result.used_reranker})[/dim]"
                            )
                except Exception as e:
                    if self.settings.verbose:
                        console.print(f"[yellow]RAG skipped: {e}[/yellow]")

        # Step 3: Web search (if needed)
        web_context = ""
        if classification.needs_web:
            async with self.telemetry.async_span("sumospace.web_search", attributes={"task": task}):
                web_result = await self._tools.execute("web_search", query=task)
            if web_result.success:
                web_context = web_result.output

        # Build full context
        full_context = self._build_full_context(
            task=task,
            rag_context=rag_context,
            web_context=web_context,
            memory_str=self._memory.context_string(5) if self.settings.memory_enabled and self._memory.recent(1) else ""
        )

        # Direct Inference Bypass
        if not self.settings.committee_enabled:
            if self.settings.verbose:
                console.print("[dim]Committee disabled — direct inference[/dim]")
            prompt = f"{task}\n\nContext:\n{rag_context}" if rag_context else task
            answer = await self._provider.complete(
                user=prompt,
                system=self.templates.get("system_prompt"),
                temperature=self.settings.committee_temperature,
                max_tokens=self.settings.committee_max_tokens,
            )
            trace.final_answer = answer
            if self.settings.dry_run or not self.settings.execution_enabled:
                prefix = "[DRY RUN]" if self.settings.dry_run else "[EXECUTION DISABLED]"
                trace.final_answer = f"{prefix} {trace.final_answer}"
            trace.success = True
            trace.plan = None

            if self.settings.memory_enabled:
                await self._memory.add("user", task)
                await self._memory.add("assistant", trace.final_answer)

            trace.duration_ms = (time.monotonic() - start) * 1000
            if self._audit_logger:
                self._audit_logger.log(trace, verdict=None)
            await self.hooks.trigger("on_task_complete", trace)
            return trace

        # Step 5: Committee deliberation
        cached_plan = self._cache.get(task, full_context)
        if cached_plan:
            if self.settings.verbose:

                console.print("[dim]Using cached execution plan[/dim]")
            verdict = CommitteeVerdict(
                approved=True, plan=cached_plan, rejection_reason="", 
                planner_output="CACHED", critic_output="CACHED", resolver_output="CACHED"
            )
        else:
            if self.settings.verbose:
                console.print("[dim]Committee deliberating...[/dim]")

            async with self.telemetry.async_span("sumospace.committee.deliberate", attributes={"task": task, "committee.mode": self.settings.committee_mode, "committee.enabled": self.settings.committee_enabled}):
                verdict = await self._committee.deliberate(task, context=full_context, mode=self.settings.committee_mode)

            if verdict.approved:
                self._cache.set(task, full_context, verdict.plan)

        trace.plan = verdict.plan

        if not verdict.approved:
            await self.hooks.trigger("on_plan_rejected", verdict.rejection_reason, verdict)
            raise ConsensusFailedError(f"Committee rejected plan: {verdict.rejection_reason}")

        await self.hooks.trigger("on_plan_approved", verdict.plan, verdict)

        if self.settings.verbose:
            console.print(
                f"[green]✓ Plan approved[/green] — "
                f"{len(verdict.plan.steps)} steps, "
                f"~{verdict.plan.estimated_duration_s:.0f}s estimated"
            )

        # Step 6: Execute (skip if dry_run)
        if self.settings.dry_run:
            trace.final_answer = self._format_dry_run(verdict)
            trace.success = True
        else:
            async with self.telemetry.async_span("sumospace.execute", attributes={"steps": len(verdict.plan.steps)}):
                await self._execute_plan(verdict.plan, trace)

        # Step 7: Synthesise final answer
        if not trace.final_answer:
            answer_parts = []
            async for chunk in self._synthesise(task, trace, full_context):
                answer_parts.append(chunk)
            trace.final_answer = "".join(answer_parts)

        # Step 8: Persist to memory
        if self.settings.memory_enabled:
            await self._memory.add("user", task)
            await self._memory.add("assistant", trace.final_answer)


        trace.success = True

    except ConsensusFailedError as e:
        trace.error = str(e)
        trace.success = False
        trace.final_answer = f"Task halted: {e}"
        if self.settings.verbose:
            console.print(f"[red]✗ {e}[/red]")

    except ExecutionHaltedError as e:
        trace.error = str(e)
        trace.success = False
        trace.final_answer = f"Execution halted at critical step: {e}"
        if self.settings.verbose:
            console.print(f"[red]✗ {e}[/red]")

    except Exception as e:
        trace.error = str(e)
        trace.success = False
        trace.final_answer = f"Unexpected error: {e}"
        if self.settings.verbose:
            console.print_exception()

    trace.duration_ms = (time.monotonic() - start) * 1000

    if self.settings.verbose:
        status = "[green]✓ Done[/green]" if trace.success else "[red]✗ Failed[/red]"
        console.print(
            f"{status} in {trace.duration_ms:.0f}ms — "
            f"{len(trace.step_traces)} steps executed"
        )

    if self._audit_logger:
        self._audit_logger.log(trace, verdict)

    if trace.success:
        await self.hooks.trigger("on_task_complete", trace)
    else:
        await self.hooks.trigger("on_task_failed", trace, trace.error)

    return trace

search_media async

search_media(query: str, top_k: int = 3) -> list[Any]

Search across all modalities. Query can be text, or path to image/audio/video. Requires settings.media_enabled = True.

Source code in sumospace/kernel.py
async def search_media(self, query: str, top_k: int = 3) -> list[Any]:
    """
    Search across all modalities. Query can be text, or path to image/audio/video.
    Requires settings.media_enabled = True.
    """
    if not self.settings.media_enabled:
        raise ValueError("Media features are disabled. Set media_enabled=True in settings.")
    if not self._initialized:
        await self.boot()

    from sumospace.media_search import MediaSearchEngine
    engine = MediaSearchEngine(self.settings)
    return engine.search(query, top_k=top_k)

shutdown async

shutdown()

Graceful shutdown.

Source code in sumospace/kernel.py
async def shutdown(self):
    """Graceful shutdown."""
    await self.hooks.trigger("on_kernel_shutdown", self)

    self._initialized = False

    if self._memory and hasattr(self._memory, 'episodic'):
        try:
            client = self._memory.episodic._client
            if client and hasattr(client, '_system'):
                client._system.stop()
        except Exception:
            pass

    if cfg := self.settings:
        if cfg.verbose:
            console.print("[dim]Kernel shutdown[/dim]")

stream_run async

stream_run(
    task: str, session_id: str | None = None
) -> AsyncIterator[
    StepTrace | SynthesisChunk | ExecutionTrace
]

Stream execution step-by-step incrementally.

Parameters:

Name Type Description Default
task str

Natural language task description.

required
session_id str | None

Optional session identifier for memory scoping.

None

Yields:

Type Description
AsyncIterator[StepTrace | SynthesisChunk | ExecutionTrace]

StepTrace as each tool finishes executing.

AsyncIterator[StepTrace | SynthesisChunk | ExecutionTrace]

SynthesisChunk for partial output of the final answer generation.

AsyncIterator[StepTrace | SynthesisChunk | ExecutionTrace]

ExecutionTrace exactly once at the end.

Note

Prefer this over run() in any UI context. run() blocks until full completion; stream_run() lets you show progress incrementally.

Warning

The final yielded object is ExecutionTrace, not StepTrace. Always check isinstance(event, ExecutionTrace) to detect completion and retrieve the overall success status and final_answer.

Source code in sumospace/kernel.py
async def stream_run(
    self, task: str, session_id: str | None = None
) -> AsyncIterator[StepTrace | SynthesisChunk | ExecutionTrace]:
    """
    Stream execution step-by-step incrementally.

    Args:
        task:       Natural language task description.
        session_id: Optional session identifier for memory scoping.

    Yields:
        `StepTrace` as each tool finishes executing.
        `SynthesisChunk` for partial output of the final answer generation.
        `ExecutionTrace` exactly once at the end.

    Note:
        Prefer this over `run()` in any UI context. `run()` blocks until
        full completion; `stream_run()` lets you show progress incrementally.

    Warning:
        The final yielded object is `ExecutionTrace`, not `StepTrace`.
        Always check `isinstance(event, ExecutionTrace)` to detect completion
        and retrieve the overall `success` status and `final_answer`.
    """
    if not self._initialized:
        await self.boot()

    session_id = session_id or uuid.uuid4().hex[:12]
    task_hash = hashlib.sha256(task.encode()).hexdigest()[:8]
    start = time.monotonic()

    async with self.telemetry.async_span(
        "sumospace.kernel.stream_run", 
        attributes={"task": task, "session_id": session_id, "task_hash": task_hash}
    ):
        trace = ExecutionTrace(
            task=task,
            session_id=session_id,
            intent=Intent.GENERAL_QA,
            classification=None,
            plan=None,
        )

    try:
        await self.hooks.trigger("on_task_start", task, session_id)

        recent_ctx = {}
        if self.settings.memory_enabled:
            recent_ctx["recent_messages"] = [m["content"] for m in self._memory.recent(5)]
        async with self.telemetry.async_span("sumospace.classify", attributes={"task": task}):
            classification = await self._classifier.classify(task, context=recent_ctx)
        trace.intent = classification.intent
        trace.classification = classification

        rag_context = ""
        if self.settings.rag_enabled and classification.needs_retrieval:
            async with self.telemetry.async_span("sumospace.rag.retrieve", attributes={"task": task}):
                try:
                    rag_result = await self._rag.retrieve(task)
                    if rag_result.chunks:
                        rag_context = rag_result.context
                        trace.rag_context = rag_context
                except Exception:
                    pass

        web_context = ""
        if classification.needs_web:
            async with self.telemetry.async_span("sumospace.web_search", attributes={"task": task}):
                web_result = await self._tools.execute("web_search", query=task)
            if web_result.success:
                web_context = web_result.output

        # Build full context
        full_context = self._build_full_context(
            task=task,
            rag_context=rag_context,
            web_context=web_context,
            memory_str=self._memory.context_string(5) if self.settings.memory_enabled and self._memory.recent(1) else ""
        )

        # Direct Inference Bypass
        if not self.settings.committee_enabled:
            prompt = f"{task}\n\nContext:\n{rag_context}" if rag_context else task
            answer_parts = []
            async for chunk in self._provider.stream(
                user=prompt,
                system=self.templates.get("system_prompt"),
                temperature=self.settings.committee_temperature,
                max_tokens=self.settings.committee_max_tokens,
            ):
                answer_parts.append(chunk)
                yield SynthesisChunk(chunk)

            trace.final_answer = "".join(answer_parts)
            if self.settings.dry_run or not self.settings.execution_enabled:
                prefix = "[DRY RUN]" if self.settings.dry_run else "[EXECUTION DISABLED]"
                trace.final_answer = f"{prefix} {trace.final_answer}"
            trace.success = True
            trace.plan = None

            if self.settings.memory_enabled:
                await self._memory.add("user", task)
                await self._memory.add("assistant", trace.final_answer)

            trace.duration_ms = (time.monotonic() - start) * 1000
            if self._audit_logger:
                self._audit_logger.log(trace, verdict=None)
            await self.hooks.trigger("on_task_complete", trace)
            yield trace
            return

        cached_plan = self._cache.get(task, full_context)
        if cached_plan:
            verdict = CommitteeVerdict(
                approved=True, plan=cached_plan, rejection_reason="", 
                planner_output="CACHED", critic_output="CACHED", resolver_output="CACHED"
            )
        else:
            async with self.telemetry.async_span("sumospace.committee.deliberate", attributes={"task": task, "committee.mode": self.settings.committee_mode, "committee.enabled": self.settings.committee_enabled}):
                verdict = await self._committee.deliberate(task, context=full_context, mode=self.settings.committee_mode)
            if verdict.approved:
                self._cache.set(task, full_context, verdict.plan)

        trace.plan = verdict.plan

        if not verdict.approved:
            await self.hooks.trigger("on_plan_rejected", verdict.rejection_reason, verdict)
            trace.error = verdict.rejection_reason
            trace.success = False
            trace.final_answer = f"Task halted: Committee rejected plan: {verdict.rejection_reason}"
            trace.duration_ms = (time.monotonic() - start) * 1000
            if self._audit_logger:
                self._audit_logger.log(trace, verdict)
            await self.hooks.trigger("on_task_failed", trace, trace.error)
            yield trace
            return

        await self.hooks.trigger("on_plan_approved", verdict.plan, verdict)

        if self.settings.dry_run or not self.settings.execution_enabled:
            trace.final_answer = self._format_dry_run(verdict)
            if not self.settings.execution_enabled:
                trace.final_answer = trace.final_answer.replace("[DRY RUN]", "[EXECUTION DISABLED]")
            trace.success = True
        else:
            for step in verdict.plan.steps:
                await self.hooks.trigger("on_step_start", step)
                step_start = time.monotonic()
                result = await self._tools.execute(step.tool, **step.parameters)
                step_ms = (time.monotonic() - step_start) * 1000

                step_trace = StepTrace(
                    step_number=step.step_number,
                    tool=step.tool,
                    description=step.description,
                    result=result,
                    duration_ms=step_ms,
                )
                trace.step_traces.append(step_trace)

                if result.success:
                    await self.hooks.trigger("on_step_complete", step_trace)
                else:
                    await self.hooks.trigger("on_step_failed", step_trace)

                yield step_trace

                if not result.success and step.critical:
                    raise ExecutionHaltedError(f"Step {step.step_number} ({step.tool}) failed")

        if not trace.final_answer:
            answer_parts = []
            async for chunk in self._synthesise(task, trace, full_context):
                answer_parts.append(chunk)
                yield SynthesisChunk(delta=chunk)
            trace.final_answer = "".join(answer_parts)

        if self.settings.memory_enabled:
            await self._memory.add("user", task)
            await self._memory.add("assistant", trace.final_answer)
        trace.success = True

    except Exception as e:
        trace.error = str(e)
        trace.success = False
        trace.final_answer = f"Error: {e}"

    trace.duration_ms = (time.monotonic() - start) * 1000
    if self._audit_logger:
        self._audit_logger.log(trace, locals().get("verdict"))

    if trace.success:
        await self.hooks.trigger("on_task_complete", trace)
    else:
        await self.hooks.trigger("on_task_failed", trace, trace.error)

    yield trace