observra

Observra — framework-agnostic telemetry for AI agents.

Observra captures what your agents do — model calls, tool use, cost, latency, and errors — normalizes every event into a common schema (CIM), and routes it to the backends you already use (JSONL, webhooks, OpenTelemetry, or a SIEM such as Exabeam). Your agent code runs unmodified: framework adapters emit events through a non-blocking background pipeline.

The public API is intentionally small:

Example:
import observra

# 1. Start the pipeline (newline-delimited JSON by default)
observra.initialize(backend="jsonl", path="telemetry.jsonl")

# 2. Attach the adapter for your agent framework
plugin = observra.create_plugin("adk")

# ...run your agent as usual; events stream in the background...

# 3. Inspect pipeline health
print(observra.get_stats())

For framework-specific setup, see the Getting Started guides.

__version__ = '1.0.4'
def initialize(backend: str = 'jsonl', **kwargs) -> None:

Initialize the global telemetry pipeline with a named backend.

Tears down any existing pipeline and replaces it with a new one. Adapters created with create_plugin keep working across re-initialization — they hold a proxy whose target is swapped to the new queue.

Arguments:
  • backend: Backend type name. One of "jsonl" (newline-delimited JSON, the default), "webhook", "otel" (OpenTelemetry spans), "otel_log" (OpenTelemetry logs), "multi" (fan out to several backends at once), or "exabeam" (Exabeam SIEM).
  • **kwargs: Passed through to the backend constructor — e.g. path= for "jsonl". queue_size (default 1000) caps the in-memory event buffer.
Raises:
  • ValueError: If backend is not a recognized name.
Example:
import observra

# Local file (default backend)
observra.initialize(backend="jsonl", path="telemetry.jsonl")
Note:

Call this once at application startup, before creating adapters. The pipeline runs on a background worker, so this never blocks your agent's hot path.

def create_plugin(framework: str = 'adk', **kwargs):

Create a framework adapter connected to the global pipeline.

The adapter forwards your agent framework's callbacks into the pipeline started by initialize. Attach the returned object the way each framework expects — e.g. as an ADK plugin, or registered as a callback handler.

Arguments:
  • framework: One of "adk", "claude", "openai", "langchain", or "pydantic-ai".
  • **kwargs: Passed to the adapter constructor (framework-specific).
Returns:

The framework-specific adapter instance.

Raises:
  • ValueError: If framework is not recognized.
Example:
import observra
observra.initialize(backend="jsonl")

# Google ADK: pass the plugin when constructing your agent
plugin = observra.create_plugin("adk")
# agent = Agent(..., plugins=[plugin])
Note:

Requires the matching extra to be installed, e.g. pip install observra[adk].

def create_logging_handler(level: int = 0):

Bridge Python's standard logging into the telemetry pipeline.

Returns a logging.Handler that converts each log record into a telemetry event on the global pipeline. Attach it to any logger to capture application logs alongside agent events.

Arguments:
  • level: Minimum log level to capture (default: logging.NOTSET — the handler inherits the logger's effective level).
Returns:

A logging.Handler that emits telemetry events.

Example:
import logging, observra
observra.initialize(backend="jsonl")
logging.getLogger().addHandler(observra.create_logging_handler())
def get_stats() -> dict:

Return live pipeline statistics.

Returns:

A dict of queue and worker counters — enqueued, dropped, depth (current queue size), events_processed and errors. Returns an empty dict if initialize has not been called yet.

Example:
stats = observra.get_stats()
print(stats.get("dropped", 0), "events dropped")
def get_metrics() -> dict:

Return current self-metrics for the active pipeline.

Returns a dict with these guaranteed keys (OBS-01 success criteria):

  • drop_count: int -- events dropped due to backpressure
  • queue_depth: int -- current queue size
  • write_latency_p50: float | None -- median write latency in seconds
  • write_latency_p99: float | None -- 99th percentile write latency
  • write_latency_p999: float | None -- 99.9th percentile write latency
  • redaction_applied_count: int -- total redaction operations applied
  • backend_write_success: int -- successful backend writes
  • backend_write_failure: int -- failed backend writes
  • labels: dict -- OBS-03 standard labels (framework, sink, event_type) when set

All values default to 0 (or None for latency, {} for labels) when no pipeline is active. This is safe to call at any time -- returns zeros rather than raising.

@dataclass(frozen=True, slots=True)
class TelemetryEvent:

Immutable telemetry event with required tracing fields.

Required fields:

event_id: Unique event identifier (ULID) timestamp: Unix timestamp trace_id: Distributed trace identifier session_id: Session identifier span_id: Current span identifier event_type: Event type (e.g., 'model_request', 'tool_end') — see EventType

Optional fields:

agent_name: Name of the agent tool_name: Name of the tool being called model_name: Name of the model being used data: Additional event-specific data framework: Framework that generated this event

TelemetryEvent( event_id: str, timestamp: float, trace_id: str, session_id: str, span_id: str, event_type: str, agent_name: Optional[str] = None, tool_name: Optional[str] = None, model_name: Optional[str] = None, data: Optional[dict[str, Any]] = None, framework: Literal['adk', 'claude', 'claude_code', 'codex_cli', 'codex_app', 'gemini_cli', 'openai', 'langgraph', 'pydantic-ai', 'copilot', 'mcp', 'openclaw', 'unknown'] = 'unknown', skill_name: Optional[str] = None, host: Optional[str] = None, user: Optional[str] = None, os: Optional[str] = None, arch: Optional[str] = None, library_version: Optional[str] = None)
event_id: str
timestamp: float
trace_id: str
session_id: str
span_id: str
event_type: str
agent_name: Optional[str]
tool_name: Optional[str]
model_name: Optional[str]
data: Optional[dict[str, Any]]
framework: Literal['adk', 'claude', 'claude_code', 'codex_cli', 'codex_app', 'gemini_cli', 'openai', 'langgraph', 'pydantic-ai', 'copilot', 'mcp', 'openclaw', 'unknown']
skill_name: Optional[str]
host: Optional[str]
user: Optional[str]
os: Optional[str]
arch: Optional[str]
library_version: Optional[str]
@runtime_checkable
class StorageBackend(typing.Protocol):

Protocol defining the storage backend interface.

All storage backends must implement these methods to be compatible with the telemetry system.

StorageBackend(*args, **kwargs)
def write(self, event: TelemetryEvent) -> None:

Write a single event to storage.

May buffer internally for performance. Call flush() to ensure durability.

Arguments:
  • event: TelemetryEvent to write
def flush(self) -> None:

Flush any buffered writes to durable storage.

Ensures all previously written events are persisted.

def close(self) -> None:

Close the backend and release all resources.

Should call flush() internally to ensure no data loss.

def get_stats(self) -> observra.core.types.BackendStats:

Get backend statistics.

Returns:

BackendStats with bytes_written, event_count, backend_type, oldest_event_ts, newest_event_ts.

def query( self, *, event_type: str | None = None, agent_id: str | None = None, from_ts: float | None = None, to_ts: float | None = None, limit: int = 1000) -> Iterator[TelemetryEvent]:

Query stored events with optional filters.

All parameters are keyword-only to prevent accidental positional misuse.

Arguments:
  • event_type: Filter by event type (e.g., 'after_model', 'before_tool')
  • agent_id: Filter by agent name
  • from_ts: Start of time range (Unix timestamp). Use datetime.timestamp() to convert.
  • to_ts: End of time range (Unix timestamp). Use datetime.timestamp() to convert.
  • limit: Maximum number of results (default: 1000). Required to prevent accidentally loading millions of rows.
Yields:

TelemetryEvent matching the filters, ordered by timestamp ascending.

Raises:
  • NotImplementedError: If this backend does not support querying.