// signal-processing.ts - Convolution-based signal processing functions import { convolve1D, convolve2D, ConvolutionKernels, ConvolutionOptions } from './convolution'; export interface SmoothingOptions { method?: 'gaussian' | 'moving_average'; windowSize?: number; sigma?: number; } export interface EdgeDetectionOptions { method?: 'sobel' | 'laplacian' | 'canny'; threshold?: number; } export interface FilterOptions { type: 'lowpass' | 'highpass' | 'bandpass' | 'bandstop'; cutoffLow?: number; cutoffHigh?: number; order?: number; } export interface DerivativeOptions { order?: 1 | 2; method?: 'gradient' | 'laplacian'; } /** * Convolution-Based Signal Processing Library * Functions that leverage convolution operations for signal processing */ export class SignalProcessor { /** * Smooth a 1D signal using convolution-based methods */ static smooth(signal: number[], options: SmoothingOptions = {}): number[] { const { method = 'gaussian', windowSize = 5, sigma = 1.0 } = options; if (signal.length === 0) { throw new Error('Signal cannot be empty'); } let kernel: number[]; switch (method) { case 'gaussian': kernel = ConvolutionKernels.gaussian1D(windowSize, sigma); break; case 'moving_average': kernel = ConvolutionKernels.average1D(windowSize); break; default: throw new Error(`Unsupported smoothing method: ${method}`); } return convolve1D(signal, kernel, { mode: 'same' }).values; } /** * Detect edges in 2D image data using convolution-based methods */ static detectEdges2D(image: number[][], options: EdgeDetectionOptions = {}): number[][] { const { method = 'sobel', threshold = 0.1 } = options; let kernelX: number[][]; let kernelY: number[][]; switch (method) { case 'sobel': kernelX = ConvolutionKernels.sobel("x"); kernelY = ConvolutionKernels.sobel("y"); break; case 'laplacian': const laplacianKernel = ConvolutionKernels.laplacian(); return convolve2D(image, laplacianKernel, { mode: 'same' }).matrix.map(row => row.map(val => Math.abs(val) > threshold ? Math.abs(val) : 0) ); default: throw new Error(`Unsupported edge detection method: ${method}`); } // Apply both kernels and combine results const edgesX = convolve2D(image, kernelX, { mode: 'same' }).matrix; const edgesY = convolve2D(image, kernelY, { mode: 'same' }).matrix; // Calculate gradient magnitude const result: number[][] = []; for (let i = 0; i < edgesX.length; i++) { result[i] = []; for (let j = 0; j < edgesX[i].length; j++) { const magnitude = Math.sqrt(edgesX[i][j] ** 2 + edgesY[i][j] ** 2); result[i][j] = magnitude > threshold ? magnitude : 0; } } return result; } /** * Apply digital filters using convolution */ static filter(signal: number[], options: FilterOptions): number[] { const { type, cutoffLow = 0.1, cutoffHigh = 0.5, order = 4 } = options; let kernel: number[]; switch (type) { case 'lowpass': // Low-pass filter using Gaussian kernel kernel = ConvolutionKernels.gaussian1D(order * 4 + 1, order / 2); return convolve1D(signal, kernel, { mode: 'same' }).values; case 'highpass': // High-pass filter using difference of Gaussians const lpKernel = ConvolutionKernels.gaussian1D(order * 4 + 1, order / 2); const smoothed = convolve1D(signal, lpKernel, { mode: 'same' }).values; return signal.map((val, i) => val - smoothed[i]); case 'bandpass': // Band-pass as combination of high-pass and low-pass const hp = this.filter(signal, { type: 'highpass', cutoffLow, order }); return this.filter(hp, { type: 'lowpass', cutoffLow: cutoffHigh, order }); case 'bandstop': // Band-stop as original minus band-pass const bp = this.filter(signal, { type: 'bandpass', cutoffLow, cutoffHigh, order }); return signal.map((val, i) => val - bp[i]); default: throw new Error(`Unsupported filter type: ${type}`); } } /** * Calculate derivatives using convolution with derivative kernels */ static derivative(signal: number[], options: DerivativeOptions = {}): number[] { const { order = 1, method = 'gradient' } = options; let kernel: number[]; if (method === 'gradient') { switch (order) { case 1: // First derivative using gradient kernel kernel = [-0.5, 0, 0.5]; // Simple gradient break; case 2: // Second derivative using Laplacian-like kernel kernel = [1, -2, 1]; // Simple second derivative break; default: throw new Error(`Unsupported derivative order: ${order}`); } } else if (method === 'laplacian' && order === 2) { // 1D Laplacian kernel = [1, -2, 1]; } else { throw new Error(`Unsupported derivative method: ${method}`); } return convolve1D(signal, kernel, { mode: 'same' }).values; } /** * Blur 2D image using Gaussian convolution */ static blur2D(image: number[][], sigma: number = 1.0, kernelSize?: number): number[][] { const size = kernelSize || Math.ceil(sigma * 6) | 1; // Ensure odd size const kernel = ConvolutionKernels.gaussian(size, sigma); return convolve2D(image, kernel, { mode: 'same' }).matrix; } /** * Sharpen 2D image using unsharp masking (convolution-based) */ static sharpen2D(image: number[][], strength: number = 1.0): number[][] { const sharpenKernel = [ [0, -strength, 0], [-strength, 1 + 4 * strength, -strength], [0, -strength, 0] ]; return convolve2D(image, sharpenKernel, { mode: 'same' }).matrix; } /** * Apply emboss effect using convolution */ static emboss2D(image: number[][], direction: 'ne' | 'nw' | 'se' | 'sw' = 'ne'): number[][] { const embossKernels = { ne: [[-2, -1, 0], [-1, 1, 1], [0, 1, 2]], nw: [[0, -1, -2], [1, 1, -1], [2, 1, 0]], se: [[0, 1, 2], [-1, 1, 1], [-2, -1, 0]], sw: [[2, 1, 0], [1, 1, -1], [0, -1, -2]] }; const kernel = embossKernels[direction]; return convolve2D(image, kernel, { mode: 'same' }).matrix; } /** * Apply motion blur using directional convolution kernel */ static motionBlur(signal: number[], direction: number, length: number = 9): number[] { // Create motion blur kernel const kernel = new Array(length).fill(1 / length); return convolve1D(signal, kernel, { mode: 'same' }).values; } /** * Detect impulse response using convolution with known impulse */ static matchedFilter(signal: number[], template: number[]): number[] { // Matched filter using cross-correlation (convolution with reversed template) const reversedTemplate = [...template].reverse(); return convolve1D(signal, reversedTemplate, { mode: 'same' }).values; } /** * Create Savitzky-Golay smoothing kernel */ private static createSavitzkyGolayKernel(windowSize: number, polyOrder: number): number[] { // Simplified Savitzky-Golay kernel generation // For a more complete implementation, you'd solve the least squares problem const halfWindow = Math.floor(windowSize / 2); const kernel: number[] = new Array(windowSize); // For simplicity, use predetermined coefficients for common cases if (windowSize === 5 && polyOrder === 2) { return [-3, 12, 17, 12, -3].map(x => x / 35); } else if (windowSize === 7 && polyOrder === 2) { return [-2, 3, 6, 7, 6, 3, -2].map(x => x / 21); } else { // Fallback to simple moving average return new Array(windowSize).fill(1 / windowSize); } } /** * Apply median filtering (note: not convolution-based, but commonly used with other filters) */ static medianFilter(signal: number[], windowSize: number = 3): number[] { const result: number[] = []; const halfWindow = Math.floor(windowSize / 2); for (let i = 0; i < signal.length; i++) { const window: number[] = []; for (let j = Math.max(0, i - halfWindow); j <= Math.min(signal.length - 1, i + halfWindow); j++) { window.push(signal[j]); } window.sort((a, b) => a - b); const median = window[Math.floor(window.length / 2)]; result.push(median); } return result; } /** * Cross-correlation using convolution */ static crossCorrelate(signal1: number[], signal2: number[]): number[] { // Cross-correlation is convolution with one signal reversed const reversedSignal2 = [...signal2].reverse(); return convolve1D(signal1, reversedSignal2, { mode: 'full' }).values; } /** * Auto-correlation using convolution */ static autoCorrelate(signal: number[]): number[] { return this.crossCorrelate(signal, signal); } /** * Detect peaks using convolution-based edge detection */ static detectPeaksConvolution(signal: number[], options: { method?: 'gradient' | 'laplacian' | 'dog'; threshold?: number; minDistance?: number; } = {}): { index: number; value: number; strength: number }[] { const { method = 'gradient', threshold = 0.1, minDistance = 1 } = options; let edgeResponse: number[]; switch (method) { case 'gradient': // First derivative to detect edges (peaks are positive edges) const gradientKernel = [-1, 0, 1]; // Simple gradient edgeResponse = convolve1D(signal, gradientKernel, { mode: 'same' }).values; break; case 'laplacian': // Second derivative to detect peaks (zero crossings) const laplacianKernel = [1, -2, 1]; // 1D Laplacian edgeResponse = convolve1D(signal, laplacianKernel, { mode: 'same' }).values; break; case 'dog': // Difference of Gaussians for multi-scale peak detection const sigma1 = 1.0; const sigma2 = 1.6; const size = 9; const gauss1 = ConvolutionKernels.gaussian1D(size, sigma1); const gauss2 = ConvolutionKernels.gaussian1D(size, sigma2); const dogKernel = gauss1.map((g1, i) => g1 - gauss2[i]); edgeResponse = convolve1D(signal, dogKernel, { mode: 'same' }).values; break; default: throw new Error(`Unsupported peak detection method: ${method}`); } // Find local maxima in edge response const peaks: { index: number; value: number; strength: number }[] = []; for (let i = 1; i < edgeResponse.length - 1; i++) { const current = edgeResponse[i]; const left = edgeResponse[i - 1]; const right = edgeResponse[i + 1]; // For gradient method, look for positive peaks // For Laplacian/DoG, look for zero crossings with positive slope let isPeak = false; let strength = 0; if (method === 'gradient') { isPeak = current > left && current > right && current > threshold; strength = current; } else { // Zero crossing detection for Laplacian/DoG isPeak = left < 0 && right > 0 && Math.abs(current) < threshold; strength = Math.abs(current); } if (isPeak) { peaks.push({ index: i, value: signal[i], strength: strength }); } } // Apply minimum distance constraint if (minDistance > 1) { return this.enforceMinDistanceConv(peaks, minDistance); } return peaks; } /** * Detect valleys using convolution (inverted peak detection) */ static detectValleysConvolution(signal: number[], options: { method?: 'gradient' | 'laplacian' | 'dog'; threshold?: number; minDistance?: number; } = {}): { index: number; value: number; strength: number }[] { // Invert signal for valley detection const invertedSignal = signal.map(x => -x); const valleys = this.detectPeaksConvolution(invertedSignal, options); // Convert back to original scale return valleys.map(valley => ({ ...valley, value: -valley.value })); } /** * Detect outliers using convolution-based methods */ static detectOutliersConvolution(signal: number[], options: { method?: 'gradient_variance' | 'median_diff' | 'local_deviation'; windowSize?: number; threshold?: number; } = {}): { index: number; value: number; outlierScore: number }[] { const { method = 'gradient_variance', windowSize = 7, threshold = 2.0 } = options; let outlierScores: number[]; switch (method) { case 'gradient_variance': // Detect outliers using gradient variance const gradientKernel = [-1, 0, 1]; const gradient = convolve1D(signal, gradientKernel, { mode: 'same' }).values; // Convolve gradient with variance-detecting kernel const varianceKernel = new Array(windowSize).fill(1).map((_, i) => { const center = Math.floor(windowSize / 2); return (i - center) ** 2; }); const normalizedVarianceKernel = varianceKernel.map(v => v / varianceKernel.reduce((s, x) => s + x, 0)); outlierScores = convolve1D(gradient.map(g => g * g), normalizedVarianceKernel, { mode: 'same' }).values; break; case 'median_diff': // Detect outliers by difference from local median (approximated with convolution) const medianApproxKernel = ConvolutionKernels.gaussian1D(windowSize, windowSize / 6); const smoothed = convolve1D(signal, medianApproxKernel, { mode: 'same' }).values; outlierScores = signal.map((val, i) => Math.abs(val - smoothed[i])); break; case 'local_deviation': // Detect outliers using local standard deviation approximation const avgKernel = ConvolutionKernels.average1D(windowSize); const localMean = convolve1D(signal, avgKernel, { mode: 'same' }).values; const squaredDiffs = signal.map((val, i) => (val - localMean[i]) ** 2); const localVar = convolve1D(squaredDiffs, avgKernel, { mode: 'same' }).values; outlierScores = signal.map((val, i) => { const std = Math.sqrt(localVar[i]); return std > 0 ? Math.abs(val - localMean[i]) / std : 0; }); break; default: throw new Error(`Unsupported outlier detection method: ${method}`); } // Find points exceeding threshold const outliers: { index: number; value: number; outlierScore: number }[] = []; outlierScores.forEach((score, i) => { if (score > threshold) { outliers.push({ index: i, value: signal[i], outlierScore: score }); } }); return outliers; } /** * Detect trend vertices (turning points) using convolution */ static detectTrendVertices(signal: number[], options: { method?: 'curvature' | 'sign_change' | 'momentum'; smoothingWindow?: number; threshold?: number; minDistance?: number; } = {}): { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] { const { method = 'curvature', smoothingWindow = 5, threshold = 0.001, minDistance = 3 } = options; // First, smooth the signal to reduce noise in trend detection const smoothed = this.smooth(signal, { method: 'gaussian', windowSize: smoothingWindow }); let vertices: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] = []; switch (method) { case 'curvature': vertices = this.detectCurvatureVertices(smoothed, threshold); break; case 'sign_change': vertices = this.detectSignChangeVertices(smoothed, threshold); break; case 'momentum': vertices = this.detectMomentumVertices(smoothed, threshold); break; default: throw new Error(`Unsupported vertex detection method: ${method}`); } // Apply minimum distance constraint if (minDistance > 1) { vertices = this.enforceMinDistanceVertices(vertices, minDistance); } // Map back to original signal values return vertices.map(v => ({ ...v, value: signal[v.index] // Use original signal value, not smoothed })); } /** * Detect vertices using curvature (second derivative) */ private static detectCurvatureVertices( signal: number[], threshold: number ): { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] { // Use second derivative kernel for curvature const curvatureKernel = [1, -2, 1]; // Discrete Laplacian const curvature = convolve1D(signal, curvatureKernel, { mode: 'same' }).values; const vertices: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] = []; // Find zero crossings in curvature with sufficient magnitude for (let i = 1; i < curvature.length - 1; i++) { const prev = curvature[i - 1]; const curr = curvature[i]; const next = curvature[i + 1]; // Zero crossing detection if ((prev > 0 && next < 0) || (prev < 0 && next > 0)) { const curvatureMagnitude = Math.abs(curr); if (curvatureMagnitude > threshold) { const type: 'peak' | 'valley' = prev > 0 ? 'peak' : 'valley'; vertices.push({ index: i, value: signal[i], type, curvature: curr }); } } } return vertices; } /** * Detect vertices using gradient sign changes */ private static detectSignChangeVertices( signal: number[], threshold: number ): { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] { // First derivative for gradient const gradientKernel = [-0.5, 0, 0.5]; // Central difference const gradient = convolve1D(signal, gradientKernel, { mode: 'same' }).values; // Second derivative for curvature const curvatureKernel = [1, -2, 1]; const curvature = convolve1D(signal, curvatureKernel, { mode: 'same' }).values; const vertices: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] = []; // Find gradient sign changes for (let i = 1; i < gradient.length - 1; i++) { const prevGrad = gradient[i - 1]; const nextGrad = gradient[i + 1]; // Check for sign change with sufficient gradient magnitude if (Math.abs(prevGrad) > threshold && Math.abs(nextGrad) > threshold) { if ((prevGrad > 0 && nextGrad < 0) || (prevGrad < 0 && nextGrad > 0)) { const type: 'peak' | 'valley' = prevGrad > 0 ? 'peak' : 'valley'; vertices.push({ index: i, value: signal[i], type, curvature: curvature[i] }); } } } return vertices; } /** * Detect vertices using momentum changes */ private static detectMomentumVertices( signal: number[], threshold: number ): { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] { // Create momentum kernel (difference over larger window) const momentumKernel = [-1, 0, 0, 0, 1]; // 4-point difference const momentum = convolve1D(signal, momentumKernel, { mode: 'same' }).values; // Detect momentum reversals const momentumGradient = convolve1D(momentum, [-0.5, 0, 0.5], { mode: 'same' }).values; const curvature = convolve1D(signal, [1, -2, 1], { mode: 'same' }).values; const vertices: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] = []; for (let i = 2; i < momentum.length - 2; i++) { const prevMomentum = momentum[i - 1]; const currMomentum = momentum[i]; const nextMomentum = momentum[i + 1]; // Check for momentum reversal if (Math.abs(momentumGradient[i]) > threshold) { if ((prevMomentum > 0 && nextMomentum < 0) || (prevMomentum < 0 && nextMomentum > 0)) { const type: 'peak' | 'valley' = prevMomentum > 0 ? 'peak' : 'valley'; vertices.push({ index: i, value: signal[i], type, curvature: curvature[i] }); } } } return vertices; } /** * Detect trend direction changes using convolution */ static detectTrendChanges(signal: number[], options: { windowSize?: number; threshold?: number; minTrendLength?: number; } = {}): { index: number; fromTrend: 'up' | 'down' | 'flat'; toTrend: 'up' | 'down' | 'flat'; strength: number }[] { const { windowSize = 10, threshold = 0.01, minTrendLength = 5 } = options; // Calculate local trends using convolution with trend-detecting kernel const trendKernel = new Array(windowSize).fill(0).map((_, i) => { const center = windowSize / 2; return (i - center) / (windowSize * windowSize / 12); // Linear trend kernel }); const trends = convolve1D(signal, trendKernel, { mode: 'same' }).values; // Classify trends const trendDirection = trends.map(t => { if (t > threshold) return 'up'; if (t < -threshold) return 'down'; return 'flat'; }); // Find trend changes const changes: { index: number; fromTrend: 'up' | 'down' | 'flat'; toTrend: 'up' | 'down' | 'flat'; strength: number }[] = []; let currentTrend: 'up' | 'down' | 'flat' = trendDirection[0]; let trendStart = 0; for (let i = 1; i < trendDirection.length; i++) { if (trendDirection[i] !== currentTrend) { const trendLength = i - trendStart; if (trendLength >= minTrendLength) { changes.push({ index: i, fromTrend: currentTrend, toTrend: trendDirection[i] as 'up' | 'down' | 'flat', strength: Math.abs(trends[i] - trends[trendStart]) }); } currentTrend = trendDirection[i] as 'up' | 'down' | 'flat'; trendStart = i; } } return changes; } /** * Enforce minimum distance between vertices */ private static enforceMinDistanceVertices( vertices: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[], minDistance: number ): { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] { if (vertices.length <= 1) return vertices; // Sort by curvature magnitude (stronger vertices first) const sorted = [...vertices].sort((a, b) => Math.abs(b.curvature) - Math.abs(a.curvature)); const result: { index: number; value: number; type: 'peak' | 'valley'; curvature: number }[] = []; for (const vertex of sorted) { let tooClose = false; for (const accepted of result) { if (Math.abs(vertex.index - accepted.index) < minDistance) { tooClose = true; break; } } if (!tooClose) { result.push(vertex); } } // Sort result by index return result.sort((a, b) => a.index - b.index); } /** * Enforce minimum distance between detected features */ private static enforceMinDistanceConv( features: { index: number; value: number; strength: number }[], minDistance: number ): { index: number; value: number; strength: number }[] { if (features.length <= 1) return features; // Sort by strength (descending) const sorted = [...features].sort((a, b) => b.strength - a.strength); const result: { index: number; value: number; strength: number }[] = []; for (const feature of sorted) { let tooClose = false; for (const accepted of result) { if (Math.abs(feature.index - accepted.index) < minDistance) { tooClose = true; break; } } if (!tooClose) { result.push(feature); } } // Sort result by index return result.sort((a, b) => a.index - b.index); } }