Source code for bssunfold.core.unfold_qpsolvers

"""QP solvers-based unfolding method with regularization selection.

This module provides the core solve_qpsolvers solver and the unfold_qpsolvers
wrapper with various regularization selection methods.
"""

import warnings
import numpy as np
from typing import Dict, Optional, Any, List

from scipy.sparse import csc_matrix

from ._matrix_utils import create_derivative_matrix
from .regularization import select_regularization_parameter
from ._base_unfolder import run_unfolding

__all__ = ["solve_qpsolvers", "unfold_qpsolvers"]


[docs] def solve_qpsolvers( A: np.ndarray, b: np.ndarray, alpha: float, norm: int = 2, solver: str = "osqp", x0: Optional[np.ndarray] = None, smoothness_order: int = 0, smoothness_weight: float = 1.0, ) -> Optional[np.ndarray]: """Solve unfolding problem using qpsolvers. Parameters ---------- A : np.ndarray Response matrix (m x n). b : np.ndarray Measurement vector (m,). alpha : float Regularization parameter. norm : int, optional Norm type (1 for L1, 2 for L2). solver : str, optional QP solver name (default: 'osqp'). x0 : np.ndarray, optional Initial values. smoothness_order : int, optional Smoothness constraint order (0, 1, or 2). smoothness_weight : float, optional Weight for smoothness term. Returns ------- Optional[np.ndarray] Unfolded spectrum or None if solving failed. """ try: from qpsolvers import available_solvers, solve_qp except ImportError as e: raise ImportError( "qpsolvers is required for unfold_qpsolvers. " "Install with: pip install qpsolvers" ) from e if solver not in available_solvers: if "osqp" in available_solvers: solver = "osqp" elif "ecos" in available_solvers: solver = "ecos" else: warnings.warn( f"Solver '{solver}' not available. Available: {available_solvers}" ) return None n = A.shape[1] P_base = csc_matrix(A.T @ A) q_base = -A.T @ b if norm == 2: P = P_base.copy() if smoothness_order == 1: L = create_derivative_matrix(n, 1) P += alpha * smoothness_weight * (L.T @ L) elif smoothness_order == 2: L = create_derivative_matrix(n, 2) P += alpha * smoothness_weight * (L.T @ L) else: P += alpha * csc_matrix(np.eye(n)) G = csc_matrix(-np.eye(n)) h = np.zeros(n) x = solve_qp( P=P, q=q_base, G=G, h=h, solver=solver, initvals=x0, verbose=False, ) elif norm == 1: n_ext = 2 * n P_ext = csc_matrix((n_ext, n_ext)) P_ext[:n, :n] = P_base if smoothness_order == 1: L = create_derivative_matrix(n, 1) P_ext[:n, :n] += alpha * smoothness_weight * (L.T @ L) elif smoothness_order == 2: L = create_derivative_matrix(n, 2) P_ext[:n, :n] += alpha * smoothness_weight * (L.T @ L) q_ext = np.zeros(n_ext) q_ext[:n] = q_base q_ext[n:] = alpha * np.ones(n) G_ext = csc_matrix((3 * n, n_ext)) h_ext = np.zeros(3 * n) G_ext[:n, :n] = -csc_matrix(np.eye(n)) G_ext[n:2 * n, n:] = -csc_matrix(np.eye(n)) G_ext[2 * n:3 * n, :n] = -csc_matrix(np.eye(n)) G_ext[2 * n:3 * n, n:] = csc_matrix(np.eye(n)) x0_ext = None if x0 is not None: x0_ext = np.concatenate([x0, np.abs(x0)]) x_ext = solve_qp( P=P_ext, q=q_ext, G=G_ext, h=h_ext, solver=solver, initvals=x0_ext, verbose=False, ) if x_ext is None: warnings.warn(f"Solver '{solver}' did not find a solution for L1 problem.") return None x = x_ext[:n] - x_ext[n:] else: raise ValueError(f"Unsupported norm type: {norm}") if x is None: warnings.warn(f"Solver '{solver}' did not find a solution.") return None return np.asarray(x)
[docs] def unfold_qpsolvers( detector_names: List[str], n_energy_bins: int, E_MeV: np.ndarray, sensitivities: Dict[str, np.ndarray], cc_icrp116: Dict[str, np.ndarray], save_result_callback, readings: Dict[str, float], initial_spectrum: Optional[np.ndarray] = None, regularization: float = 1e-4, norm: int = 2, solver: str = "osqp", calculate_errors: bool = False, noise_level: float = 0.01, n_montecarlo: int = 100, save_result: bool = True, regularization_method: str = "manual", noise_var: Optional[float] = None, smoothness_order: int = 0, smoothness_weight: float = 1.0, random_state: Optional[int] = None, ) -> Dict[str, Any]: """Unfold using qpsolvers with regularization selection. Parameters ---------- detector_names : List[str] Names of available detectors. n_energy_bins : int Number of energy bins. E_MeV : np.ndarray Energy grid. sensitivities : Dict[str, np.ndarray] Detector sensitivity arrays. cc_icrp116 : Dict[str, np.ndarray] ICRP-116 conversion coefficients. save_result_callback : callable Callback to save result to history. readings : Dict[str, float] Detector readings. initial_spectrum : np.ndarray, optional Initial spectrum guess. regularization : float, optional Regularization parameter, default: 1e-4. norm : int, optional Norm type (1 for L1, 2 for L2), default: 2. solver : str, optional QP solver name, default: 'osqp'. calculate_errors : bool, optional If True, calculate Monte-Carlo uncertainty, default: False. noise_level : float, optional Noise level for Monte-Carlo, default: 0.01. n_montecarlo : int, optional Number of Monte-Carlo samples, default: 100. save_result : bool, optional Save result to history, default: True. regularization_method : str, optional Method for selecting regularization parameter. noise_var : float, optional Noise variance for discrepancy principle ('dp' method). smoothness_order : int, optional Smoothness constraint order (0, 1, or 2), default: 0. smoothness_weight : float, optional Weight for smoothness term, default: 1.0. random_state : int, optional Random seed for reproducibility. Returns ------- Dict[str, Any] Unfolding results including spectrum, residuals, and metadata. """ selected = [name for name in detector_names if name in readings] b = np.array([readings[name] for name in selected], dtype=float) A = np.array([sensitivities[name] for name in selected], dtype=float) if regularization_method == "manual": alpha = regularization selected_lambda = alpha elif regularization_method == "cosine": if initial_spectrum is None: raise ValueError( "For 'cosine' regularization method, " "initial_spectrum must be provided." ) if norm != 2: warnings.warn( f"Cosine regularization selection method assumes L2 " f"norm, but norm={norm} was requested. Using L2 for " f"selection." ) initial_spectrum_norm = np.maximum(initial_spectrum, 0) if len(initial_spectrum_norm) != n_energy_bins: raise ValueError( f"Initial spectrum length ({len(initial_spectrum)}) " f"must match number of energy bins ({n_energy_bins})" ) alphas = np.logspace(-9, 2, 100) cosine_similarities = [] for alpha_val in alphas: x_temp = solve_qpsolvers( A, b, alpha_val, 2, solver, x0=initial_spectrum_norm, smoothness_order=smoothness_order, smoothness_weight=smoothness_weight, ) if x_temp is not None: norm_temp = np.linalg.norm(x_temp) if norm_temp > 0: cos_sim = np.dot(x_temp, initial_spectrum_norm) / (norm_temp * np.linalg.norm(initial_spectrum_norm)) cosine_similarities.append(cos_sim) else: cosine_similarities.append(-1) else: cosine_similarities.append(-1) optimal_idx = int(np.argmax(cosine_similarities)) selected_lambda = alphas[optimal_idx] alpha = selected_lambda print( f"Selected regularization (method=cosine): " f"{selected_lambda:.3e}" ) else: if norm != 2: warnings.warn( f"Automatic regularization selection methods assume L2 " f"norm, but norm={norm} was requested. Using L2 for " f"selection." ) try: selected_lambda = select_regularization_parameter( A, b, method=regularization_method, noise_var=noise_var ) except Exception as e: raise ValueError( f"Regularization selection failed: {e}. " "Consider using manual regularization." ) alpha = selected_lambda print( f"Selected regularization (method={regularization_method}): " f"{selected_lambda:.3e}" ) x0_default = np.zeros(n_energy_bins) def solve_wrapper(A, b, **kwargs): kwargs.pop('x0', None) x = solve_qpsolvers( A, b, alpha, norm, solver, smoothness_order=smoothness_order, smoothness_weight=smoothness_weight, ) if x is None: x = np.zeros(A.shape[1]) warnings.warn("Solution not found, returning zero spectrum.") return x return run_unfolding( detector_names=detector_names, n_energy_bins=n_energy_bins, E_MeV=E_MeV, sensitivities=sensitivities, cc_icrp116=cc_icrp116, save_result_callback=save_result_callback, readings=readings, initial_spectrum=initial_spectrum, default_initial=x0_default, solve_func=solve_wrapper, solve_kwargs={}, method_name=f"qpsolvers_{solver}", extra_output={ "norm": norm, "regularization": regularization, "regularization_method": regularization_method, "selected_regularization": float(selected_lambda), "smoothness_order": smoothness_order, "smoothness_weight": smoothness_weight, }, calculate_errors=calculate_errors, noise_level=noise_level, n_montecarlo=n_montecarlo, random_state=random_state, save_result=save_result, )