Source code for kodeagent.tracer

"""Interfaces for hierarchical tracing and agent observability.

Supports multiple observability backends (e.g., Langfuse and LangSmith) with a unified API for
creating traces, spans, and generations. Provides no-op implementations when tracing is disabled.
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, Literal

TRACING_TYPES = Literal['langfuse', 'langsmith']


[docs] def create_tracer_manager(tracing_type: TRACING_TYPES | None = None) -> 'AbstractTracerManager': """Factory function to create a tracer manager based on the specified type. Args: tracing_type: The type of tracing backend to use. Defaults to None for no-op tracing. Returns: An instance of LangfuseTracerManager, LangSmithTracerManager, or NoOpTracerManager for the specified tracing backend. """ if tracing_type == 'langfuse': return LangfuseTracerManager() if tracing_type == 'langsmith': return LangSmithTracerManager() return NoOpTracerManager()
[docs] class AbstractObservation(ABC): """Abstract interface for trace observations. Represents a single node in a hierarchical trace tree. Can be a top-level trace, a nested span, or an LLM generation. Implements context manager protocol for use with 'with' statements. """
[docs] @abstractmethod def update(self, **kwargs: Any) -> None: """Update observation properties during execution. Used to log intermediate states like partial outputs, status, or metadata. Args: **kwargs: Provider-specific properties (output, status, level, etc). """
[docs] @abstractmethod def end(self, **kwargs: Any) -> None: """Explicitly signal the end of the observation. Records final state and duration. Called automatically when using context manager protocol. Args: **kwargs: Provider-specific properties (output, result, error, etc). """
def __enter__(self) -> 'AbstractObservation': """Context manager entry: return self for 'with' statement.""" return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Context manager exit: automatically end observation.""" self.end()
[docs] class AbstractTracerManager(ABC): """Abstract interface for tracer management. Factory for creating hierarchical observations. Handles initialization and backend-specific configuration. Implementations should support at least one tracing backend (Langfuse, LangSmith, etc.) or be a no-op when tracing is disabled. """
[docs] @abstractmethod def start_trace(self, name: str, input_data: Any) -> AbstractObservation: """Start a new top-level trace. Args: name: Identifier for the trace operation. input_data: Input data to log for the trace. Returns: An observation object for the trace root. """
[docs] @abstractmethod def start_span( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested span under a parent observation. Used for logical sub-operations within a trace or parent span. Args: parent: Parent observation (trace or span). name: Identifier for the span operation. input_data: Input data to log for the span. Returns: An observation object for the span. """
[docs] @abstractmethod def start_generation( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested LLM generation under a parent observation. Used specifically for LLM calls within a trace or span. Args: parent: Parent observation (trace or span). name: Identifier for the generation operation. input_data: Input data (e.g., prompt) to log for the generation. Returns: An observation object for the generation. """
[docs] @abstractmethod def flush(self) -> None: """Flush any buffered traces to the backend. Ensures that all recorded traces and spans are sent to the observability platform before the application exits. """
[docs] class NoOpObservation(AbstractObservation): """No-op observation implementation. Used when tracing is disabled. All methods are no-ops and return self to support hierarchical nesting without side effects. """
[docs] def update(self, **kwargs: Any) -> None: """No-op: ignore all property updates."""
[docs] def end(self, **kwargs: Any) -> None: """No-op: ignore end signal."""
[docs] class NoOpTracerManager(AbstractTracerManager): """No-op tracer manager implementation. Used when no observability backend is enabled. Provides a complete no-op implementation of the TracerManager interface that satisfies the contract while performing no actual tracing operations. """
[docs] def start_trace(self, name: str, input_data: Any) -> AbstractObservation: """No-op: return a no-op observation.""" return NoOpObservation()
[docs] def start_span( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """No-op: return a no-op observation.""" return NoOpObservation()
[docs] def start_generation( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """No-op: return a no-op observation.""" return NoOpObservation()
[docs] def flush(self) -> None: """No-op: do nothing."""
[docs] class LangfuseObservation(AbstractObservation): """Langfuse implementation of observation. Wraps Langfuse Trace, Span, or Generation objects to provide a consistent interface. """
[docs] def __init__(self, obj: Any) -> None: """Initialize Langfuse observation. Args: obj: The Langfuse Trace, Span, or Generation object. """ self.obj = obj
[docs] def update(self, **kwargs: Any) -> None: """Update observation properties. Args: **kwargs: Properties to update. """ if hasattr(self.obj, 'update'): self.obj.update(**kwargs)
[docs] def end(self, **kwargs: Any) -> None: """End the observation. Maps 'result' to 'output' for compatibility with Langfuse. Calls end() if available (Spans/Generations), else update() (Traces). Args: **kwargs: Final state data. """ # Map 'result' to 'output' if present if 'result' in kwargs and 'output' not in kwargs: kwargs['output'] = kwargs.pop('result') if hasattr(self.obj, 'end'): self.obj.end(**kwargs) elif hasattr(self.obj, 'update'): self.obj.update(**kwargs)
[docs] class LangfuseTracerManager(AbstractTracerManager): """Langfuse implementation of TracerManager. Integrates with Langfuse observability platform to create and manage hierarchical traces, spans, and generations. Tracing is disabled if the Langfuse package is not installed. """
[docs] def __init__(self) -> None: """Initialize the Langfuse client.""" try: from langfuse.client import Langfuse self.client: Any = Langfuse() except ImportError: logging.error( 'Langfuse package is not installed. Please install langfuse to use' ' LangfuseTracerManager. Tracing will be disabled.' ) self.client = None
[docs] def start_trace(self, name: str, input_data: Any) -> AbstractObservation: """Start a new trace with Langfuse. Args: name: Identifier for the trace operation. input_data: Input data to log for the trace. Returns: A Langfuse trace object wrapped as AbstractObservation. """ if not self.client: return NoOpObservation() return LangfuseObservation(self.client.trace(name=name, input=input_data))
[docs] def start_span( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested span under a parent observation with Langfuse. Args: parent: Parent observation (assumes LangfuseObservation). name: Identifier for the span operation. input_data: Input data to log for the span. Returns: A Langfuse span object wrapped as AbstractObservation. """ if not self.client: return NoOpObservation() # Unwrap if it's our LangfuseObservation parent_obj = parent.obj if isinstance(parent, LangfuseObservation) else parent if hasattr(parent_obj, 'span'): return LangfuseObservation(parent_obj.span(name=name, input=input_data)) return NoOpObservation()
[docs] def start_generation( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested LLM generation under a parent observation with Langfuse. Args: parent: Parent observation (assumes LangfuseObservation). name: Identifier for the generation operation. input_data: Input data (e.g., prompt) to log for the generation. Returns: A Langfuse generation object wrapped as AbstractObservation. """ if not self.client: return NoOpObservation() # Unwrap if it's our LangfuseObservation parent_obj = parent.obj if isinstance(parent, LangfuseObservation) else parent if hasattr(parent_obj, 'generation'): return LangfuseObservation(parent_obj.generation(name=name, input=input_data)) return NoOpObservation()
[docs] def flush(self) -> None: """Flush Langfuse traces.""" if self.client: self.client.flush()
[docs] class LangSmithObservation(AbstractObservation): """LangSmith implementation of observation. Wraps a LangSmith RunTree object to manage hierarchical runs. """
[docs] def __init__(self, run_tree: Any) -> None: """Initialize LangSmith observation. Args: run_tree: The LangSmith RunTree object. """ self.run_tree = run_tree self.ended = False # Post the run to LangSmith immediately try: self.run_tree.post() except Exception as e: logging.error('Error creating run in LangSmith: %s', e)
[docs] def update(self, **kwargs: Any) -> None: """Update observation outputs during execution. Args: **kwargs: Output data to accumulate. """ if not self.ended: if 'error' in kwargs: self.run_tree.error = kwargs['error']
# We don't need to manually verify other updates as RunTree handles finalization on end() # or we can patch if strictly needed, but post() is usually sufficient for start.
[docs] def end(self, **kwargs: Any) -> None: """End the observation and send final data to LangSmith. Args: **kwargs: Final output/result data. """ if not self.ended: self.ended = True outputs = kwargs.get('output') error = kwargs.get('error') metadata = kwargs.get('metadata') # Ensure outputs is a dict for RunTree.patch() which calls outputs.copy() if outputs is not None and not isinstance(outputs, dict): outputs = {'output': outputs} # Map 'result' to 'outputs' if provided in result keyword if 'result' in kwargs and outputs is None: res = kwargs.pop('result') outputs = res if isinstance(res, dict) else {'output': res} try: if metadata: self.run_tree.add_metadata(metadata) self.run_tree.end(outputs=outputs, error=error) self.run_tree.patch() except Exception as e: logging.exception('Error updating run in LangSmith: %s', e)
[docs] class LangSmithTracerManager(AbstractTracerManager): """LangSmith implementation of TracerManager. Uses LangSmith RunTree to manage hierarchical runs. """
[docs] def __init__(self) -> None: """Initialize the LangSmith client.""" try: from langsmith import Client self.client: Any = Client() except ImportError: logging.error( 'LangSmith package is not installed. Please install langsmith to use' ' LangSmithTracerManager. Tracing will be disabled.' ) self.client = None
[docs] def start_trace(self, name: str, input_data: Any) -> AbstractObservation: """Start a new trace with LangSmith.""" if not self.client: return NoOpObservation() try: from langsmith.run_trees import RunTree run_tree = RunTree(name=name, run_type='chain', inputs=input_data, client=self.client) return LangSmithObservation(run_tree) except ImportError: return NoOpObservation()
[docs] def start_span( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested span under a parent observation. Args: parent: Parent observation (assumes LangSmithObservation). name: Identifier for the span operation. input_data: Input data to log for the span. Returns: A LangSmith span object wrapped as AbstractObservation. """ if not self.client or not isinstance(parent, LangSmithObservation): return NoOpObservation() child_run = parent.run_tree.create_child(name=name, run_type='tool', inputs=input_data) return LangSmithObservation(child_run)
[docs] def start_generation( self, parent: AbstractObservation, name: str, input_data: Any ) -> AbstractObservation: """Start a nested LLM generation. Args: parent: Parent observation (assumes LangSmithObservation). name: Identifier for the generation operation. input_data: Input data (e.g., prompt) to log for the generation. Returns: A LangSmith generation object wrapped as AbstractObservation. """ if not self.client or not isinstance(parent, LangSmithObservation): return NoOpObservation() child_run = parent.run_tree.create_child(name=name, run_type='llm', inputs=input_data) return LangSmithObservation(child_run)
[docs] def flush(self) -> None: """Flush LangSmith runs.""" if self.client and hasattr(self.client, 'flush'): self.client.flush()