observra
Observra — framework-agnostic telemetry for AI agents.
Observra captures what your agents do — model calls, tool use, cost, latency, and errors — normalizes every event into a common schema (CIM), and routes it to the backends you already use (JSONL, webhooks, OpenTelemetry, or a SIEM such as Exabeam). Your agent code runs unmodified: framework adapters emit events through a non-blocking background pipeline.
The public API is intentionally small:
initialize— start the global pipeline and choose a backend.create_plugin— wire a framework adapter (ADK, Claude, OpenAI, LangChain, Pydantic AI) into that pipeline.create_logging_handler— capture standard-library logs as telemetry.get_stats/get_metrics— inspect pipeline health.
Example:
import observra # 1. Start the pipeline (newline-delimited JSON by default) observra.initialize(backend="jsonl", path="telemetry.jsonl") # 2. Attach the adapter for your agent framework plugin = observra.create_plugin("adk") # ...run your agent as usual; events stream in the background... # 3. Inspect pipeline health print(observra.get_stats())
For framework-specific setup, see the Getting Started guides.
Initialize the global telemetry pipeline with a named backend.
Tears down any existing pipeline and replaces it with a new one. Adapters
created with create_plugin keep working across re-initialization — they
hold a proxy whose target is swapped to the new queue.
Arguments:
- backend: Backend type name. One of
"jsonl"(newline-delimited JSON, the default),"webhook","otel"(OpenTelemetry spans),"otel_log"(OpenTelemetry logs),"multi"(fan out to several backends at once), or"exabeam"(Exabeam SIEM). - **kwargs: Passed through to the backend constructor — e.g.
path=for"jsonl".queue_size(default1000) caps the in-memory event buffer.
Raises:
- ValueError: If
backendis not a recognized name.
Example:
import observra # Local file (default backend) observra.initialize(backend="jsonl", path="telemetry.jsonl")
Note:
Call this once at application startup, before creating adapters. The pipeline runs on a background worker, so this never blocks your agent's hot path.
Create a framework adapter connected to the global pipeline.
The adapter forwards your agent framework's callbacks into the pipeline
started by initialize. Attach the returned object the way each framework
expects — e.g. as an ADK plugin, or registered as a callback handler.
Arguments:
- framework: One of
"adk","claude","openai","langchain", or"pydantic-ai". - **kwargs: Passed to the adapter constructor (framework-specific).
Returns:
The framework-specific adapter instance.
Raises:
- ValueError: If
frameworkis not recognized.
Example:
import observra observra.initialize(backend="jsonl") # Google ADK: pass the plugin when constructing your agent plugin = observra.create_plugin("adk") # agent = Agent(..., plugins=[plugin])
Note:
Requires the matching extra to be installed, e.g.
pip install observra[adk].
Bridge Python's standard logging into the telemetry pipeline.
Returns a logging.Handler that converts each log record into a telemetry
event on the global pipeline. Attach it to any logger to capture application
logs alongside agent events.
Arguments:
- level: Minimum log level to capture (default:
logging.NOTSET— the handler inherits the logger's effective level).
Returns:
A
logging.Handlerthat emits telemetry events.
Example:
import logging, observra observra.initialize(backend="jsonl") logging.getLogger().addHandler(observra.create_logging_handler())
Return live pipeline statistics.
Returns:
A dict of queue and worker counters —
enqueued,dropped,depth(current queue size),events_processedanderrors. Returns an empty dict ifinitializehas not been called yet.
Example:
stats = observra.get_stats() print(stats.get("dropped", 0), "events dropped")
Return current self-metrics for the active pipeline.
Returns a dict with these guaranteed keys (OBS-01 success criteria):
- drop_count: int -- events dropped due to backpressure
- queue_depth: int -- current queue size
- write_latency_p50: float | None -- median write latency in seconds
- write_latency_p99: float | None -- 99th percentile write latency
- write_latency_p999: float | None -- 99.9th percentile write latency
- redaction_applied_count: int -- total redaction operations applied
- backend_write_success: int -- successful backend writes
- backend_write_failure: int -- failed backend writes
- labels: dict -- OBS-03 standard labels (framework, sink, event_type) when set
All values default to 0 (or None for latency, {} for labels) when no pipeline is active. This is safe to call at any time -- returns zeros rather than raising.
Immutable telemetry event with required tracing fields.
Required fields:
event_id: Unique event identifier (ULID) timestamp: Unix timestamp trace_id: Distributed trace identifier session_id: Session identifier span_id: Current span identifier event_type: Event type (e.g., 'model_request', 'tool_end') — see EventType
Optional fields:
agent_name: Name of the agent tool_name: Name of the tool being called model_name: Name of the model being used data: Additional event-specific data framework: Framework that generated this event
Protocol defining the storage backend interface.
All storage backends must implement these methods to be compatible with the telemetry system.
Write a single event to storage.
May buffer internally for performance. Call flush() to ensure durability.
Arguments:
- event: TelemetryEvent to write
Flush any buffered writes to durable storage.
Ensures all previously written events are persisted.
Close the backend and release all resources.
Should call flush() internally to ensure no data loss.
Get backend statistics.
Returns:
BackendStats with bytes_written, event_count, backend_type, oldest_event_ts, newest_event_ts.
Query stored events with optional filters.
All parameters are keyword-only to prevent accidental positional misuse.
Arguments:
- event_type: Filter by event type (e.g., 'after_model', 'before_tool')
- agent_id: Filter by agent name
- from_ts: Start of time range (Unix timestamp). Use datetime.timestamp() to convert.
- to_ts: End of time range (Unix timestamp). Use datetime.timestamp() to convert.
- limit: Maximum number of results (default: 1000). Required to prevent accidentally loading millions of rows.
Yields:
TelemetryEvent matching the filters, ordered by timestamp ascending.
Raises:
- NotImplementedError: If this backend does not support querying.