Source code for fast_bocpd.core

"""
Python API for C-based BOCPD implementation.
"""
import numpy as np
import ctypes
from typing import Tuple

from . import _bindings
from .hazard import ConstantHazard
from .models import (
    GaussianNIG,
    StudentTNG,
    PoissonGamma,
    BernoulliBeta,
    BinomialBeta,
    GammaGamma,
)


[docs] class BOCPD: """ Bayesian Online Changepoint Detection. This is a Python wrapper around the C implementation for performance. """
[docs] def __init__(self, obs_model, hazard, max_run_length: int = 200): """ Initialize BOCPD detector. Args: obs_model: Observation model (e.g., GaussianNIG instance) hazard: Hazard function (e.g., ConstantHazard instance) max_run_length: Maximum run length to track """ if not _bindings.is_c_available(): raise RuntimeError( "C library not available. Please compile the C library first:\n" " cd fast_bocpd/_c\n" " make lib" ) self.obs_model = obs_model self.hazard = hazard self.max_run_length = int(max_run_length) if self.max_run_length <= 0: raise ValueError("max_run_length must be > 0") self._state = None self._init_c_backend()
def _init_c_backend(self) -> None: """Initialize C backend.""" if not isinstance(self.hazard, ConstantHazard): raise ValueError(f"Unsupported hazard function: {type(self.hazard)}") # Determine observation model type and create params if isinstance(self.obs_model, GaussianNIG): obs_model_type = _bindings.OBS_MODEL_GAUSSIAN_NIG obs_params = _bindings.GaussianNIGParams( mu0=self.obs_model.mu0, kappa0=self.obs_model.kappa0, alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0 ) elif isinstance(self.obs_model, StudentTNG): if self.obs_model.is_grid: # Grid Student-t mode obs_model_type = _bindings.OBS_MODEL_STUDENT_T_NG_GRID # Ensure contiguous arrays and keep them alive (C will deep-copy but needs contiguous input) self._nu_grid_arr = np.ascontiguousarray(self.obs_model.nu_grid, dtype=np.float64) self._nu_prior_arr = np.ascontiguousarray(self.obs_model.nu_prior, dtype=np.float64) obs_params = _bindings.StudentTNGGridParams( mu0=self.obs_model.mu0, kappa0=self.obs_model.kappa0, alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0, K=self.obs_model.K, nu_grid=self._nu_grid_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double)), nu_prior=self._nu_prior_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double)) ) else: # Fixed ν mode obs_model_type = _bindings.OBS_MODEL_STUDENT_T_NG obs_params = _bindings.StudentTNGParams( mu0=self.obs_model.mu0, kappa0=self.obs_model.kappa0, alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0, nu=self.obs_model.nu ) elif isinstance(self.obs_model, PoissonGamma): obs_model_type = _bindings.OBS_MODEL_POISSON_GAMMA obs_params = _bindings.PoissonGammaParams( alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0 ) elif isinstance(self.obs_model, BernoulliBeta): obs_model_type = _bindings.OBS_MODEL_BERNOULLI_BETA obs_params = _bindings.BernoulliBetaParams( alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0 ) elif isinstance(self.obs_model, BinomialBeta): obs_model_type = _bindings.OBS_MODEL_BINOMIAL_BETA obs_params = _bindings.BinomialBetaParams( alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0, N=self.obs_model.n_trials, log_N_factorial=0.0 # Will be set by C in bocpd_init ) elif isinstance(self.obs_model, GammaGamma): obs_model_type = _bindings.OBS_MODEL_GAMMA_GAMMA obs_params = _bindings.GammaGammaParams( alpha0=self.obs_model.alpha0, beta0=self.obs_model.beta0, shape=self.obs_model.shape, log_gamma_k=0.0 # Will be set by C in bocpd_init ) else: raise ValueError(f"Unsupported observation model: {type(self.obs_model)}") hazard_params = _bindings.ConstantHazardParams() ret = _bindings.constant_hazard_init(hazard_params, self.hazard.lambda_) if ret != 0: raise RuntimeError("Failed to initialize hazard function") # Initialize BOCPD state self._state = _bindings.BOCPDState() # Cast params to void* for C function (safer than implicit conversion) obs_params_ptr = ctypes.cast(ctypes.byref(obs_params), ctypes.c_void_p) haz_params_ptr = ctypes.cast(ctypes.byref(hazard_params), ctypes.c_void_p) ret = _bindings.bocpd_init( self._state, obs_model_type, obs_params_ptr, _bindings.HAZARD_CONSTANT, haz_params_ptr, self.max_run_length ) if ret != 0: # Free any partially allocated resources self.close() raise RuntimeError("Failed to initialize BOCPD") def _require_state(self) -> _bindings.BOCPDState: """Return the live state or raise if closed.""" if self._state is None: raise RuntimeError("BOCPD state is not initialized or has been closed") return self._state
[docs] def reset(self) -> None: """Reset to prior (as if no data has been seen).""" _bindings.bocpd_reset(self._require_state())
[docs] def update(self, x: float) -> Tuple[np.ndarray, float]: """ Process one new observation (online mode). Args: x: New observation Returns: posterior_r: Array of P(r_t = r | x_1:t) cp_prob: Probability of changepoint (posterior_r[0]) """ # Validate data for discrete models (if applicable) if isinstance(self.obs_model, (PoissonGamma, BernoulliBeta, BinomialBeta)): self.obs_model.validate_data(x) state = self._require_state() cp_prob = ctypes.c_double() posterior_ptr = _bindings.bocpd_update(state, float(x), cp_prob) if not posterior_ptr: raise RuntimeError("BOCPD update failed") # Copy posterior to numpy array posterior_r = np.ctypeslib.as_array( posterior_ptr, shape=(self.max_run_length + 1,) ).copy() return posterior_r, cp_prob.value
[docs] def batch_update(self, data: np.ndarray) -> np.ndarray: """ Process multiple observations at once (offline mode). Args: data: Array of observations Returns: cp_probs: Array of changepoint probabilities for each time step """ # Validate and convert data for discrete models (if applicable) if isinstance(self.obs_model, (PoissonGamma, BernoulliBeta, BinomialBeta)): data = self.obs_model.validate_batch(data) else: data = np.ascontiguousarray(data, dtype=np.float64) cp_probs = np.zeros(len(data), dtype=np.float64) state = self._require_state() ret = _bindings.bocpd_batch_update( state, data.ctypes.data_as(ctypes.POINTER(ctypes.c_double)), len(data), cp_probs.ctypes.data_as(ctypes.POINTER(ctypes.c_double)) ) if ret != 0: raise RuntimeError("Batch update failed") return cp_probs
[docs] def get_map_run_length(self) -> int: """ Get the most likely (MAP) run length at current time. Returns: Most likely run length (0 means changepoint just occurred) Example: >>> map_r = bocpd.get_map_run_length() >>> if map_r == 0: ... print("Changepoint detected!") >>> else: ... print(f"Current regime is {map_r} observations old") """ r_map = _bindings.bocpd_get_map_run_length(self._require_state()) if r_map < 0: raise RuntimeError("Failed to get MAP run length") return r_map
[docs] def get_map_confidence(self) -> float: """ Get confidence in the MAP run length estimate. Returns: Probability mass at MAP run length (higher = more confident) Example: >>> map_r = bocpd.get_map_run_length() >>> confidence = bocpd.get_map_confidence() >>> print(f"MAP estimate: r={map_r} (confidence: {confidence:.1%})") """ posterior = self.get_posterior() map_r = self.get_map_run_length() return posterior[map_r]
[docs] def get_posterior(self) -> np.ndarray: """ Get current posterior distribution over run lengths. Returns: posterior_r: Array of P(r_t = r | data) for r in [0, max_run_length] """ posterior = np.zeros(self.max_run_length + 1, dtype=np.float64) ret = _bindings.bocpd_get_posterior( self._require_state(), posterior.ctypes.data_as(ctypes.POINTER(ctypes.c_double)) ) if ret != 0: raise RuntimeError("Failed to get posterior") return posterior
[docs] def close(self): """Explicitly free C resources (recommended for deterministic cleanup).""" if getattr(self, "_state", None) is not None: try: _bindings.bocpd_free(self._state) finally: self._state = None self._state = None # Clean up grid arrays if present if hasattr(self, '_nu_grid_arr'): del self._nu_grid_arr if hasattr(self, '_nu_prior_arr'): del self._nu_prior_arr
[docs] def __enter__(self): """Context manager entry.""" return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() return False
[docs] def __del__(self): """Cleanup C resources (fallback if close() not called).""" try: self.close() except Exception: pass
def is_available() -> bool: """Check if C library is available.""" return _bindings.is_c_available()