"""
Helper utilities for changepoint detection in streaming applications.
"""
import math
from typing import List, Optional, Tuple, Any
from dataclasses import dataclass
[docs]
@dataclass
class Changepoint:
"""Record of a detected changepoint"""
index: int
"""Time index where changepoint occurred"""
prev_run_length: int = 0
"""Estimated length of the segment that just ended (from MAP estimate)"""
cp_prob: float = 0.0
"""Changepoint probability P(r_t=0 | x_1:t)"""
map_run_length: int = 0
"""MAP run length estimate at detection time"""
map_confidence: float = 0.0
"""Confidence in MAP estimate (max posterior probability)"""
observation: float = 0.0
"""Observation value at changepoint"""
metadata: Optional[Any] = None
"""Optional user-provided metadata (e.g., timestamp, label)"""
@property
def confidence(self) -> float:
"""Alias for cp_prob (backward compatibility)"""
return self.cp_prob
def __str__(self):
meta_str = f" ({self.metadata})" if self.metadata else ""
return (f"Changepoint at t={self.index}{meta_str}: "
f"previous segment lasted {self.prev_run_length} steps "
f"(P(CP)={self.cp_prob:.1%}, MAP r={self.map_run_length})")
[docs]
class OnlineChangeDetector:
"""
Wrapper for BOCPD optimized for online/streaming detection.
Automatically detects changepoints using:
1. Changepoint probability threshold (P(r_t=0) >= min_cp_prob)
2. MAP run length reset heuristic (sharp drop in estimated segment length)
Features:
- Automatic changepoint detection with configurable sensitivity
- History tracking
- Confidence scoring
- Optional metadata attachment
- Debouncing to prevent duplicate detections
Example:
>>> from fast_bocpd import BOCPD, GaussianNIG, ConstantHazard
>>> from fast_bocpd.utils import OnlineChangeDetector
>>>
>>> bocpd = BOCPD(GaussianNIG(...), ConstantHazard(100))
>>> detector = OnlineChangeDetector(bocpd)
>>>
>>> # Process streaming data
>>> for observation in data_stream:
... cp = detector.update(observation)
... if cp:
... print(f"Changepoint detected: {cp}")
"""
@staticmethod
def _compute_default_min_cp_prob(lambda_: float, bayes_factor: float = 5.0) -> float:
"""
Compute principled default for min_cp_prob using Bayes factor.
Args:
lambda_: Expected run length (from ConstantHazard)
bayes_factor: Evidence threshold (5=moderate, 10=strong, 3=weak)
Returns:
Minimum changepoint probability threshold
"""
H = 1.0 / float(lambda_)
return (bayes_factor * H) / ((1.0 - H) + bayes_factor * H)
[docs]
def __init__(self, bocpd, min_cp_prob: Optional[float] = None,
reset_r: int = 2, drop_prev_min: Optional[int] = None,
cooldown: Optional[int] = None, bayes_factor: float = 5.0):
"""
Initialize online detector.
Args:
bocpd: BOCPD instance
min_cp_prob: Minimum P(r_t=0) to report changepoint
If None, computed from hazard using Bayes factor
Lower = more sensitive, more false positives
Higher = less sensitive, fewer false positives
reset_r: MAP run length threshold for "reset" detection (default: 2)
drop_prev_min: Minimum previous run length for "sharp drop"
If None, set to max(10, 0.25*lambda_)
cooldown: Minimum timesteps between changepoint reports
If None, set to max(3, 0.1*lambda_)
bayes_factor: Evidence threshold for auto-computing min_cp_prob (default: 5.0)
5=moderate evidence, 10=strong, 3=weak
Only used if min_cp_prob is None and hazard is ConstantHazard
"""
self.bocpd = bocpd
self.bayes_factor = bayes_factor
if not math.isfinite(self.bayes_factor) or self.bayes_factor <= 0.0:
raise ValueError(f"bayes_factor must be finite and > 0, got {self.bayes_factor}")
# Auto-compute defaults from hazard (duck-typing to avoid imports)
# If hazard exposes .lambda_ (expected run length), use it for principled defaults
hazard = getattr(bocpd, "hazard", None)
lambda_ = getattr(hazard, "lambda_", None)
needs_lambda = (min_cp_prob is None) or (drop_prev_min is None) or (cooldown is None)
if lambda_ is not None and needs_lambda:
try:
lam = float(lambda_)
except (TypeError, ValueError):
raise ValueError(f"hazard.lambda_ must be a positive number, got {lambda_!r}")
if not math.isfinite(lam) or lam <= 0.0:
raise ValueError(f"hazard.lambda_ must be finite and > 0, got {lam}")
if min_cp_prob is None:
min_cp_prob = self._compute_default_min_cp_prob(lam, bayes_factor)
if drop_prev_min is None:
drop_prev_min = max(10, int(round(0.25 * lam)))
if cooldown is None:
cooldown = max(3, int(round(0.1 * lam)))
# Fallback defaults (softer threshold for unknown hazards)
self.min_cp_prob = min_cp_prob if min_cp_prob is not None else 0.1
self.reset_r = reset_r
self.drop_prev_min = drop_prev_min if drop_prev_min is not None else 10
self.cooldown = cooldown if cooldown is not None else 5
# Validate probability threshold
if not (0.0 < self.min_cp_prob < 1.0):
raise ValueError(f"min_cp_prob must be in (0, 1), got {self.min_cp_prob}")
# Validate other parameters
if self.cooldown < 0:
raise ValueError(f"cooldown must be >= 0, got {self.cooldown}")
if self.reset_r < 0:
raise ValueError(f"reset_r must be >= 0, got {self.reset_r}")
if self.drop_prev_min < 0:
raise ValueError(f"drop_prev_min must be >= 0, got {self.drop_prev_min}")
self._t = 0
self._prev_map_r = None
self._prev_cp_prob = 0.0
self._has_emitted = False # Track if we've ever emitted a CP
self._last_cp_t = -self.cooldown # Start before time 0 so can_emit is True initially
self._pending_reset = False # Latch reset detection during cooldown
self._changepoints: List[Changepoint] = []
self._map_history: List[int] = []
[docs]
def update(self, x: float, metadata: Optional[Any] = None) -> Optional[Changepoint]:
"""
Process new observation and detect changepoints.
Args:
x: New observation
metadata: Optional metadata to attach (e.g., timestamp, sample ID)
Returns:
Changepoint if detected, None otherwise
"""
# Update BOCPD
_, cp_prob = self.bocpd.update(x)
map_r = self.bocpd.get_map_run_length()
map_conf = self.bocpd.get_map_confidence()
# Track history
self._map_history.append(map_r)
cp = None
# Check if we're in cooldown
can_emit = (self._t - self._last_cp_t >= self.cooldown)
# Primary signal: probabilistic changepoint probability
# (crossing threshold from below, only checked when we can emit)
detect_cp_prob = False
if can_emit:
detect_cp_prob = (cp_prob >= self.min_cp_prob) and (self._prev_cp_prob < self.min_cp_prob)
# Secondary signal: run-length reset heuristic
# (MAP dropped from long run to very short run)
detect_reset = False
if self._prev_map_r is not None:
detect_reset = (map_r <= self.reset_r) and (self._prev_map_r >= self.drop_prev_min)
# Latch pending reset during cooldown so we don't miss it
if detect_reset and not can_emit:
self._pending_reset = True
# Check for pending reset that was latched during cooldown
# Only trigger if we still look "reset-like" now (prevent false positives)
if can_emit and self._pending_reset:
if map_r <= self.reset_r:
detect_reset = True
self._pending_reset = False # Always clear latch when cooldown ends
detect_cp = detect_cp_prob or detect_reset
# Emit changepoint if detected and not in cooldown
if detect_cp:
cp = Changepoint(
index=self._t,
prev_run_length=self._prev_map_r if self._prev_map_r is not None else 0,
cp_prob=cp_prob,
map_run_length=map_r,
map_confidence=map_conf,
observation=x,
metadata=metadata
)
self._changepoints.append(cp)
self._last_cp_t = self._t
self._has_emitted = True # Mark that we've emitted at least one CP
self._pending_reset = False # Clear latch after emission
self._prev_map_r = map_r
self._prev_cp_prob = cp_prob
self._t += 1
return cp
[docs]
def get_current_run_length(self) -> int:
"""
Get time since last detected changepoint.
Returns:
Number of observations since last changepoint detected by this wrapper
Returns 0 if no changepoint has been detected yet
"""
if not self._has_emitted:
return 0
return self._t - self._last_cp_t
[docs]
def get_current_map_run_length(self) -> int:
"""
Get BOCPD's current MAP run length estimate.
Returns:
Most likely run length (segment length) according to BOCPD posterior
"""
return 0 if self._prev_map_r is None else self._prev_map_r
[docs]
def get_changepoints(self) -> List[Changepoint]:
"""Get all detected changepoints"""
return self._changepoints.copy()
[docs]
def get_map_history(self) -> List[int]:
"""
Get complete history of MAP run length estimates.
Returns:
List where element i is the MAP run length at time i
"""
return self._map_history.copy()
[docs]
def get_segments(self) -> List[Tuple[int, int]]:
"""
Get segments between changepoints as (start, end) indices.
Note: Segment boundaries are based on detection time (first sample after change).
Returns:
List of (start_idx, end_idx) tuples for each segment
Example:
>>> segments = detector.get_segments()
>>> for start, end in segments:
... print(f"Segment from {start} to {end} (length: {end-start})")
"""
if not self._changepoints:
return [(0, self._t)]
segments = []
prev_end = 0
for cp in self._changepoints:
segments.append((prev_end, cp.index))
prev_end = cp.index
# Add final segment
if prev_end < self._t:
segments.append((prev_end, self._t))
return segments
[docs]
def reset(self):
"""Reset detector to initial state"""
self.bocpd.reset()
self._t = 0
self._prev_map_r = None
self._prev_cp_prob = 0.0
self._has_emitted = False # Reset emission flag
self._last_cp_t = -self.cooldown # Reset to allow immediate detection
self._pending_reset = False
self._changepoints = []
self._map_history = []
[docs]
def __repr__(self) -> str:
"""String representation for debugging"""
return (f"OnlineChangeDetector(min_cp_prob={self.min_cp_prob:.4f}, "
f"reset_r={self.reset_r}, drop_prev_min={self.drop_prev_min}, "
f"cooldown={self.cooldown}, bayes_factor={self.bayes_factor})")