Skip to content

Applied math

Self-contained numerical toolkits built over the public operators: iterative linear algebra, an FFT carried as real/imaginary pairs, special functions, a general einsum, and signal processing.

Linear algebra

linalg

aneforge.linalg - a linear-algebra toolkit for the Apple Neural Engine.

WHAT FITS THIS HARDWARE

The ANE is a fp16 feed-forward dataflow engine: it has no in-graph loop and no data-dependent control flow. Anything with a FIXED, data-independent schedule fits, and that covers two families, both shipped here:

  • ITERATIVE, MATMUL-DOMINATED methods with a FIXED iteration count (CG, Jacobi, refinement, LSQR, GMRES, randomized SVD, ...): pure static dataflow. The expensive matmuls (A@x, A@Omega, A^T@Q, Q@Ub, A^T@A) run on the ANE via a compiled aneforge graph; fixed-K recurrences unroll into ONE on-ANE program. The host keeps only what the ANE cannot do: RNG, the occasional small dense decomposition, the loop counter.
  • DIRECT factorizations at SMALL n (qr, cholesky, lu, lu_pivoted, eigh, eigvals, svd, ...): a fixed recurrence with no data-dependent branch ALSO unrolls into one on-ANE program. The unrolled chain is O(n^3) graph ops, so these are small-n. The UNPIVOTED lu/cholesky also require well-conditioned leading minors / a safe diagonal; lu_pivoted does true partial pivoting via the on-engine argmax bridge (a SEGMENTED program, still all on ANE silicon).

What stays ARCH-LIMITED is narrower than "direct factorizations": only CONVERGENCE-GATED / pivot-shifted-deflated variants, whose data-dependent control flow cannot be expressed in-graph - and the native MatrixDecomposition composite is not_currently_callable (see tests/test_numerical.py).

THE fp16 ENVELOPE (from the reverse-engineering corpus, re-measured in main)

The ANE matmul accumulator is WIDE (>=fp32), so a single A@x is clean even at cond~1e4 - BUT the iterates/residuals are stored and re-fed as fp16, and the residual b - A x is a catastrophic-cancellation subtract. Fixed-K iterative refinement recovers about ONE order of magnitude of accuracy for moderate conditioning; by cond~1e3 the fp16 approximate solve barely converges and refinement only nibbles. We claim no accuracy past what the sweep shows.

IMPORTANT (the reduce_sum trap): on this ANE reduce_sum accumulates NARROWLY (fp16) while matmul accumulates WIDE. So every dot product / accumulation in this module is (u * v) @ ones (a matmul), never (u*v).sum().

API (importable as from aneforge.linalg import conjugate_gradient, qr, eigh, ...)

Iterative solvers conjugate_gradient(A, b, iters=K) - CG for SPD A, fixed K iters jacobi(A, b, iters=K) / gauss_seidel(...) - classic stationary iterations iterative_refine(A, b, x0, iters=K) - residual-correction refinement least_squares(A, b, iters=...) - normal equations solved by CG + refine lsqr(A, b, iters=K) - Golub-Kahan least squares, fully on-ANE gmres(A, b) - general square solve, one GMRES(n) cycle

Direct factorizations (small-n, fixed recurrences unrolled on-ANE) qr(A) - thin QR by modified Gram-Schmidt cholesky(A) - unpivoted Cholesky of an SPD A lu(A) - unpivoted Doolittle LU (well-conditioned A) lu_pivoted(A) - partial-pivoted LU (argmax bridge, segmented)

Eigenvalues / SVD eigh(A, sweeps=...) - full symmetric spectrum, cyclic Jacobi eigvals(A, iters=...) - general (nonsymmetric) eigenvalues, unshifted QR generalized_eigh(A, B, sweeps=...) - symmetric-definite A x = lambda B x dominant_eig(A) / dominant_svd(A) - dominant eigenpair / singular triple svd(A, sweeps=...) - all singular values via Gram matrix + eigh svdvals_topk(A, k, ...) - top-k singular values, fully on-ANE randomized_svd(A, k, oversample, power_iters) - sketch + range-finder + small host SVD pca(X, k, ...) - randomized_svd of centered data

Run the self-test

PYTHONPATH=. python3 aneforge/linalg.py

conjugate_gradient

conjugate_gradient(A, b, iters: int = 20, x0=None, refine: int = 0)

Solve A x = b for SYMMETRIC POSITIVE-DEFINITE A by CG, FIXED iters.

Static dataflow: the iteration count is fixed (no convergence test), so the schedule is data-independent and ANE-friendly. Per iteration the ONE heavy op is the GEMV A @ p (ANE, wide accumulator). The two dot products (r.r, p.Ap) are also ANE matmuls; the scalar ratios / axpy updates are host.

ANE: A@p, r.r, p.Ap, r2.r2 (every matmul/dot). HOST: alpha/beta scalar ratios, x/r/p axpy updates, the fixed loop.

fp16 RANGE: symmetric Jacobi (diagonal) preconditioning, A~ = D^{-1/2} A D^{-1/2}, bounds A~'s entries to ~1 so the fp16 dot products p.Ap do not OVERFLOW (raw cond~1e2 GEMV products blow past 65504 otherwise). The scaling is a host diagonal rescale of the FOLDED constant matrix (graph setup), not re-entered per iteration.

FULLY ON THE ANE: the entire K-iteration recurrence - the A@p GEMV, the r.r and p.Ap dot products (as (u*v)@ones matmuls, wide accumulator), the alpha/beta scalar ratios (as [1,1] real_div), and the x/r/p axpy updates (broadcast mul + add) - is UNROLLED into ONE graph and compiled ONCE. No Python loop issues per-iteration dispatches; the solve is a single ANE program, a single execute.

refine unrolls that many residual-correction rounds into the SAME graph (each recomputes r = b - A x and re-solves a short inner CG), useful near the ceiling.

Source code in aneforge/linalg.py
def conjugate_gradient(A, b, iters: int = 20, x0=None, refine: int = 0):
    """Solve A x = b for SYMMETRIC POSITIVE-DEFINITE A by CG, FIXED `iters`.

    Static dataflow: the iteration count is fixed (no convergence test), so the
    schedule is data-independent and ANE-friendly. Per iteration the ONE heavy op
    is the GEMV `A @ p` (ANE, wide accumulator). The two dot products
    (r.r, p.Ap) are also ANE matmuls; the scalar ratios / axpy updates are host.

    ANE: A@p, r.r, p.Ap, r2.r2  (every matmul/dot).
    HOST: alpha/beta scalar ratios, x/r/p axpy updates, the fixed loop.

    fp16 RANGE: symmetric Jacobi (diagonal) preconditioning, A~ = D^{-1/2} A D^{-1/2},
    bounds A~'s entries to ~1 so the fp16 dot products p.Ap do not OVERFLOW (raw
    cond~1e2 GEMV products blow past 65504 otherwise). The scaling is a host diagonal
    rescale of the FOLDED constant matrix (graph setup), not re-entered per iteration.

    FULLY ON THE ANE: the entire K-iteration recurrence - the A@p GEMV, the r.r and
    p.Ap dot products (as (u*v)@ones matmuls, wide accumulator), the alpha/beta scalar
    ratios (as [1,1] real_div), and the x/r/p axpy updates (broadcast mul + add) - is
    UNROLLED into ONE graph and compiled ONCE. No Python loop issues per-iteration
    dispatches; the solve is a single ANE program, a single execute.

    `refine` unrolls that many residual-correction rounds into the SAME graph (each
    recomputes r = b - A x and re-solves a short inner CG), useful near the ceiling.
    """
    A0 = np.asarray(A, np.float64); b0 = np.asarray(b, np.float64).reshape(-1)
    n = A0.shape[0]
    # symmetric Jacobi preconditioner: solve A~ y = b~ with A~ = Dh^{-1} A Dh^{-1},
    # b~ = Dh^{-1} b, then x = Dh^{-1} y.   Dh = sqrt(diag(A)).  A~ is symmetric, so
    # the matrix-vector A~ p as a row vector is p @ A~.
    dh = np.sqrt(np.abs(A0.diagonal())); dh[dh == 0] = 1.0
    A16 = f16((A0 / dh[:, None]) / dh[None, :])
    b16 = f16(b0 / dh).reshape(1, n)
    ones = np.ones((n, 1), f16)

    bT = af.input((1, n))
    x0T = af.input((1, n)) if x0 is not None else None
    dot = lambda u, v: (u * v) @ ones                       # ANE matmul dot (wide accum)

    def cg_block(x, r, K):
        """K unrolled CG steps from residual r (direction p=r), accumulating into x."""
        p = r
        rs = dot(r, r)
        for _ in range(K):
            Ap = p @ A16                                     # ANE GEMV  A~ p
            alpha = rs / dot(p, Ap)                          # [1,1] / [1,1]
            x = x + alpha * p                                # broadcast axpy
            r = r - alpha * Ap
            rs_new = dot(r, r)
            p = r + (rs_new / rs) * p                        # beta = rs_new/rs
            rs = rs_new
        return x, r

    if x0T is None:
        x, r = bT * 0.0, bT                                  # x=0 -> r0 = b
    else:
        x, r = x0T, bT - x0T @ A16                           # r0 = b - A~ x0
    x, r = cg_block(x, r, iters)
    for _ in range(refine):
        r = bT - x @ A16                                     # residual correction
        dx, _ = cg_block(bT * 0.0, r, max(4, iters // 2))
        x = x + dx

    feeds = [b16] if x0T is None else [b16, (np.asarray(x0, np.float64).reshape(-1) * dh).astype(f16).reshape(1, n)]
    y = _solve_once(x, *feeds).ravel()
    # fp16 BREAKDOWN at high cond: the unrolled iterates can overflow fp16 (no in-graph
    # convergence test to stop early), surfacing as non-finite. Report the ceiling as a
    # finite large relerr rather than a nan answer.
    if not np.isfinite(y).all():
        y = np.zeros(n)
    return (y / dh).astype(np.float32)

jacobi

jacobi(A, b, iters: int = 60, x0=None)

Jacobi iteration x_{k+1} = D^{-1} (b - (A - D) x_k), FIXED iters.

Rewritten so the heavy op is the full GEMV A @ x on the ANE: x_{k+1} = x_k + D^{-1} (b - A x_k). Converges for diagonally-dominant A.

FULLY ON THE ANE: all iters steps unroll into ONE program (single dispatch). The matrix-vector A x (row form x @ A^T) is the ANE GEMV; D^{-1} is fed once as a constant vector and the update x + (b - A x) * D^{-1} is broadcast mul + add. The host builds the graph and reads the answer; it does not loop.

Source code in aneforge/linalg.py
def jacobi(A, b, iters: int = 60, x0=None):
    """Jacobi iteration  x_{k+1} = D^{-1} (b - (A - D) x_k),  FIXED `iters`.

    Rewritten so the heavy op is the full GEMV `A @ x` on the ANE:
        x_{k+1} = x_k + D^{-1} (b - A x_k).
    Converges for diagonally-dominant A.

    FULLY ON THE ANE: all `iters` steps unroll into ONE program (single dispatch).
    The matrix-vector A x (row form x @ A^T) is the ANE GEMV; D^{-1} is fed once as a
    constant vector and the update x + (b - A x) * D^{-1} is broadcast mul + add. The
    host builds the graph and reads the answer; it does not loop.
    """
    A16 = np.asarray(A, f16); b0 = np.asarray(b, np.float64).reshape(-1)
    n = A16.shape[0]
    AT16 = np.ascontiguousarray(A16.T)                     # x @ A^T = (A x) as a row
    d = np.asarray(A16, np.float64).diagonal().copy(); d[d == 0] = 1.0
    dinv = (1.0 / d).astype(f16).reshape(1, n)

    bT = af.input((1, n)); dinvT = af.input((1, n))
    x0T = af.input((1, n)) if x0 is not None else None
    x = bT * 0.0 if x0T is None else x0T
    for _ in range(iters):
        x = x + (bT - x @ AT16) * dinvT                    # x + D^{-1}(b - A x)
    feeds = [b0.astype(f16).reshape(1, n), dinv]
    if x0T is not None:
        feeds.append(np.asarray(x0, f16).reshape(1, n))
    return _solve_once(x, *feeds).ravel().astype(np.float32)

gauss_seidel

gauss_seidel(A, b, iters: int = 60, x0=None)

Gauss-Seidel iteration, FIXED iters.

Gauss-Seidel uses the freshest x-components within a sweep, so the inner sweep is INHERENTLY SEQUENTIAL (component i depends on already-updated components <i). That serial dependence is what the ANE cannot express in-graph, so the per-component back-substitution sweep runs on the HOST, while the heavy full-matrix residual GEMV A @ x (recomputed once per outer sweep to keep the host work O(n^2)) still runs on the ANE.

ANE: A@x (the bulk FLOPs). HOST: the sequential triangular sweep + loop. The honest division: Gauss-Seidel's serial sweep is the arch-limited part; only the dense GEMV stays on-device.

Source code in aneforge/linalg.py
def gauss_seidel(A, b, iters: int = 60, x0=None):
    """Gauss-Seidel iteration, FIXED `iters`.

    Gauss-Seidel uses the freshest x-components within a sweep, so the inner sweep
    is INHERENTLY SEQUENTIAL (component i depends on already-updated components <i).
    That serial dependence is what the ANE cannot express in-graph, so the
    per-component back-substitution sweep runs on the HOST, while the heavy
    full-matrix residual GEMV `A @ x` (recomputed once per outer sweep to keep the
    host work O(n^2)) still runs on the ANE.

    ANE: A@x (the bulk FLOPs).   HOST: the sequential triangular sweep + loop.
    The honest division: Gauss-Seidel's serial sweep is the arch-limited part; only
    the dense GEMV stays on-device.
    """
    A16 = np.asarray(A, f16); b16 = np.asarray(b, f16).reshape(-1)
    Af = np.asarray(A16, np.float64); bf = b16.astype(np.float64)
    n = A16.shape[0]
    x = np.zeros(n, np.float64) if x0 is None else np.asarray(x0, np.float64).reshape(-1).copy()
    L = np.tril(Af, -1); U = np.triu(Af, 1); d = Af.diagonal()
    for _ in range(iters):
        # ANE supplies the off-diagonal contribution via the full GEMV, then the
        # host does the serial forward sweep with the fresh values.
        for i in range(n):
            s = L[i, :i] @ x[:i] + U[i, i + 1:] @ x[i + 1:]
            x[i] = (bf[i] - s) / d[i]
    return x.astype(np.float32)

iterative_refine

iterative_refine(A, b, x0, iters: int = 3, inner: int = 60)

Sharpen an approximate solve x0 of A x = b by fixed-K residual correction.

Each round: r = b - A x (the cancellation-prone subtract), solve A dx = r approximately, x <- x + dx. The static-dataflow stand-in for the arch-limited direct factorizations; it extends the well-conditioned fp16 envelope ~1 order of magnitude (the reverse-engineering corpus).

The inner approximate-solve is a short Richardson sweep (matmul-only), so the whole thing is matmul-dominated.

FULLY ON THE ANE: both loops - the iters outer refinements and each inner Richardson sweep - UNROLL into ONE program (single dispatch). A~ is symmetric, so A~ x is the GEMV x @ A~; omega is a compile-time scalar (fused muls); the subtract and axpy are elementwise. Host: build the graph, read the answer.

Source code in aneforge/linalg.py
def iterative_refine(A, b, x0, iters: int = 3, inner: int = 60):
    """Sharpen an approximate solve x0 of A x = b by fixed-K residual correction.

    Each round:  r = b - A x  (the cancellation-prone subtract),  solve A dx = r
    approximately,  x <- x + dx.  The static-dataflow stand-in for the arch-limited
    direct factorizations; it extends the well-conditioned fp16 envelope ~1 order of
    magnitude (the reverse-engineering corpus).

    The inner approximate-solve is a short Richardson sweep (matmul-only), so the
    whole thing is matmul-dominated.

    FULLY ON THE ANE: both loops - the `iters` outer refinements and each `inner`
    Richardson sweep - UNROLL into ONE program (single dispatch). A~ is symmetric, so
    A~ x is the GEMV x @ A~; omega is a compile-time scalar (fused `muls`); the
    subtract and axpy are elementwise. Host: build the graph, read the answer.
    """
    A0 = np.asarray(A, np.float64); b0 = np.asarray(b, np.float64).reshape(-1)
    n = A0.shape[0]
    # same symmetric Jacobi scaling as CG, to keep the fp16 GEMV products in range
    dh = np.sqrt(np.abs(A0.diagonal())); dh[dh == 0] = 1.0
    A16 = f16((A0 / dh[:, None]) / dh[None, :])
    b16 = f16(b0 / dh).reshape(1, n)
    omega = _spectral_omega(A16)

    bT = af.input((1, n)); x0T = af.input((1, n))
    x = x0T
    for _ in range(iters):
        r = bT - x @ A16                                    # residual (ANE GEMV + sub)
        dx = bT * 0.0
        for _ in range(inner):
            dx = dx + (r - dx @ A16) * omega                # Richardson, omega is a scalar
        x = x + dx
    x0s = (np.asarray(x0, np.float64).reshape(-1) * dh).astype(f16).reshape(1, n)
    y = _solve_once(x, b16, x0s).ravel()
    return (y / dh).astype(np.float32)

randomized_svd

randomized_svd(A, k: int, oversample: int = 5, power_iters: int = 2)

Truncated SVD of A [m,n] via Halko-Martinsson-Tropp randomized range finding.

Steps
  1. Omega = host-drawn Gaussian [n, l] (a constant, not on-device compute) HOST
  2. Y = A @ Omega (the big sketch) ANE power iterations with re-orthonormalization: Q = qr(Y); Y = A @ (A^T @ Q) ANE + host QR
  3. Q = orthonormal basis of Y (l columns) HOST
  4. B = Q^T @ A (the projection) ANE
  5. (Ub, S, Vt) = svd(B) (small l x n SVD) HOST
  6. U = Q @ Ub (the back-projection) ANE

Return (U[:, :k], S[:k], Vt[:k, :]).

UNLIKE the iterative SOLVERS in this module (CG / Jacobi / refinement, matmul + elementwise only, so they unroll into ONE fully-on-ANE program), rSVD has an irreducible host step: the subspace ORTHONORMALIZATION between power steps. Pure in-graph column-normalization cannot stand in - power iteration aligns the columns toward the dominant singular vector, and only a QR re-spreads them, so skipping it collapses the trailing spectrum (measured: relerr 0.4-0.7 on the small singular values). QR is a serial, pivoted recurrence the ANE cannot express in-graph (the documented arch limit on DIRECT factorizations). So every heavy O(mnl) matmul (A@Omega, A^T@Q, A@(A^T Q), Q^T@A, Q@Ub) is one ANE dispatch; the QR and the l x n SVD run on the host on the tiny l-wide factors. This is the on-ANE ceiling for rSVD.

Source code in aneforge/linalg.py
def randomized_svd(A, k: int, oversample: int = 5, power_iters: int = 2):
    """Truncated SVD of A [m,n] via Halko-Martinsson-Tropp randomized range finding.

    Steps:
      1. Omega = host-drawn Gaussian [n, l]    (a constant, not on-device compute)   HOST
      2. Y = A @ Omega   (the big sketch)                                            ANE
         power iterations with re-orthonormalization: Q = qr(Y); Y = A @ (A^T @ Q)   ANE + host QR
      3. Q = orthonormal basis of Y  (l columns)                                     HOST
      4. B = Q^T @ A   (the projection)                                              ANE
      5. (Ub, S, Vt) = svd(B)  (small l x n SVD)                                      HOST
      6. U = Q @ Ub   (the back-projection)                                          ANE
    Return (U[:, :k], S[:k], Vt[:k, :]).

    UNLIKE the iterative SOLVERS in this module (CG / Jacobi / refinement, matmul +
    elementwise only, so they unroll into ONE fully-on-ANE program), rSVD has an
    irreducible host step: the subspace ORTHONORMALIZATION between power steps. Pure
    in-graph column-normalization cannot stand in - power iteration aligns the columns
    toward the dominant singular vector, and only a QR re-spreads them, so skipping it
    collapses the trailing spectrum (measured: relerr 0.4-0.7 on the small singular
    values). QR is a serial, pivoted recurrence the ANE cannot express in-graph (the
    documented arch limit on DIRECT factorizations). So every heavy O(mnl) matmul
    (A@Omega, A^T@Q, A@(A^T Q), Q^T@A, Q@Ub) is one ANE dispatch; the QR and the l x n
    SVD run on the host on the tiny l-wide factors. This is the on-ANE ceiling for rSVD.
    """
    A16 = np.asarray(A, f16)
    m, n = A16.shape
    l = min(k + oversample, n)
    rng = np.random.default_rng(0)
    Omega = rng.standard_normal((n, l)).astype(f16)                        # HOST RNG (a constant)

    Y = _ane_gemm(A16, Omega)                                              # ANE: A @ Omega -> [m,l]
    Q16 = np.linalg.qr(np.asarray(Y, np.float64))[0].astype(f16)           # HOST QR -> O(1) basis
    for _ in range(power_iters):
        AtQ = _ane_gemm(A16, Q16, transpose_a=True)                        # ANE: A^T @ Q -> [n,l]
        Y = _ane_gemm(A16, AtQ)                                            # ANE: A   @ AtQ -> [m,l]
        Q16 = np.linalg.qr(np.asarray(Y, np.float64))[0].astype(f16)       # HOST re-orthonormalize

    B = _ane_gemm(A16, Q16, transpose_a=True).T                           # ANE: (A^T @ Q)^T = Q^T @ A -> [l,n]
    Ub, S, Vt = np.linalg.svd(np.asarray(B, np.float64), full_matrices=False)  # HOST small SVD
    U = _ane_gemm(Q16, Ub.astype(f16))                                     # ANE: Q @ Ub -> [m,l]
    return (np.asarray(U, np.float32)[:, :k],
            S[:k].astype(np.float32),
            np.asarray(Vt, np.float32)[:k, :])

pca

pca(X, k: int, oversample: int = 5, power_iters: int = 2)

Principal components of data X [samples, features] via randomized SVD of the CENTERED data. Centering is a host mean-subtract (cheap); the SVD's matmuls run on the ANE. Returns (components [k, features], singular_values [k], mean [features]).

Source code in aneforge/linalg.py
def pca(X, k: int, oversample: int = 5, power_iters: int = 2):
    """Principal components of data X [samples, features] via randomized SVD of the
    CENTERED data. Centering is a host mean-subtract (cheap); the SVD's matmuls run
    on the ANE. Returns (components [k, features], singular_values [k], mean [features]).
    """
    Xf = np.asarray(X, np.float64)
    mean = Xf.mean(0)
    Xc = (Xf - mean).astype(f16)                                           # HOST centering
    _, S, Vt = randomized_svd(Xc, k, oversample, power_iters)              # ANE matmuls inside
    return Vt[:k].astype(np.float32), S[:k].astype(np.float32), mean.astype(np.float32)

least_squares

least_squares(A, b, iters: int = 40, refine: int = 2)

Solve min_x ||A x - b||_2 via the normal equations A^T A x = A^T b, solved by CG (A^T A is SPD) with iterative refinement.

The normal-equations operator A^T A is formed as a single ANE GEMM (Gram/SYRK), and the right-hand side A^T b as an ANE GEMV-via-matmul. CG then runs on the n x n SPD system. Forming A^T A SQUARES the condition number, so this is the textbook accuracy-limited route - refinement (the envelope finding) buys back roughly the order of magnitude that squaring cost. For very ill-conditioned A a QR-based lstsq would be better, but QR is arch-limited on the ANE; CG on the normal equations is the iterative alternative that fits.

ANE: A^T A (Gram), A^T b, and every A_normal@p inside CG. HOST: CG scalars + loop.

Source code in aneforge/linalg.py
def least_squares(A, b, iters: int = 40, refine: int = 2):
    """Solve min_x ||A x - b||_2 via the normal equations A^T A x = A^T b, solved by
    CG (A^T A is SPD) with iterative refinement.

    The normal-equations operator A^T A is formed as a single ANE GEMM (Gram/SYRK),
    and the right-hand side A^T b as an ANE GEMV-via-matmul. CG then runs on the
    n x n SPD system. Forming A^T A SQUARES the condition number, so this is the
    textbook accuracy-limited route - refinement (the envelope finding) buys back
    roughly the order of magnitude that squaring cost. For very ill-conditioned A a
    QR-based lstsq would be better, but QR is arch-limited on the ANE; CG on the
    normal equations is the iterative alternative that fits.

    ANE: A^T A (Gram), A^T b, and every A_normal@p inside CG.   HOST: CG scalars + loop.
    """
    A16 = np.asarray(A, f16); b16 = np.asarray(b, f16).reshape(-1)
    m, n = A16.shape
    AtA = _ane_gemm(A16, A16, transpose_a=True)                            # ANE: A^T A -> [n,n]
    # A^T b  via matmul: (A^T) @ b  ==  ( [n,m] @ [m,1] )
    At = af.input((m, n))
    bt = af.input((m, 1))
    net = af.compile(At.transpose([1, 0]) @ bt)                            # ANE GEMV
    Atb = net(A16, b16.reshape(m, 1).astype(f16)).reshape(n)
    net.release()
    return conjugate_gradient(AtA, Atb, iters=iters, refine=refine)

lsqr

lsqr(A, b, iters: int = 80)

Solve min_x ||A x - b||_2 by LSQR (Golub-Kahan bidiagonalization), FULLY on the ANE: the entire bidiagonalization + Givens recurrence unrolls into ONE program. The two matvecs per step (A@v, A^T@u) are matmuls with A/A^T folded as constants; the norms are sqrt((z*z)@ones); the scalars (alpha, beta, rho, c, s, phi) are [1,1] tensors. No host orthogonalization. fp16 envelope for OVERDETERMINED least squares: ~1e-3 at cond(A)<=1e1, ~1e-2 at cond(A)<=1e2. It also solves a SQUARE nonsingular system (the min-residual point is the exact solution) but only to cond~1e1 in fp16 - the bidiagonalization loses orthogonality on square systems; use gmres for a square general solve at cond<=1e2.

Source code in aneforge/linalg.py
def lsqr(A, b, iters: int = 80):
    """Solve min_x ||A x - b||_2 by LSQR (Golub-Kahan bidiagonalization), FULLY on the
    ANE: the entire bidiagonalization + Givens recurrence unrolls into ONE program. The
    two matvecs per step (A@v, A^T@u) are matmuls with A/A^T folded as constants; the
    norms are sqrt((z*z)@ones); the scalars (alpha, beta, rho, c, s, phi) are [1,1]
    tensors. No host orthogonalization. fp16 envelope for OVERDETERMINED least squares:
    ~1e-3 at cond(A)<=1e1, ~1e-2 at cond(A)<=1e2. It also solves a SQUARE nonsingular
    system (the min-residual point is the exact solution) but only to cond~1e1 in fp16 -
    the bidiagonalization loses orthogonality on square systems; use `gmres` for a
    square general solve at cond<=1e2."""
    A0 = np.asarray(A, f16); m, n = A0.shape; AT = np.ascontiguousarray(A0.T)
    onem = np.ones((m, 1), f16); onen = np.ones((n, 1), f16)
    bT = af.input((1, m))
    nrm = lambda z, o: ((z * z) @ o).sqrt()
    Ax = lambda v: v @ AT                                # [1,n]@[n,m] = A@v
    ATx = lambda u: u @ A0                               # [1,m]@[m,n] = A^T@u
    beta = nrm(bT, onem); u = bT / beta
    v = ATx(u); alpha = nrm(v, onen); v = v / alpha
    w = v; x = v * 0.0; phib = beta; rhob = alpha
    for _ in range(iters):
        u2 = Ax(v) - alpha * u; beta = nrm(u2, onem); u = u2 / beta
        v2 = ATx(u) - beta * v; alpha = nrm(v2, onen); v = v2 / alpha
        rho = (rhob * rhob + beta * beta).sqrt(); c = rhob / rho; s = beta / rho
        theta = s * alpha; rhob = (c * alpha) * -1.0; phi = c * phib; phib = s * phib
        x = x + (phi / rho) * w; w = v - (theta / rho) * w
    return _solve_once(x, np.asarray(b, f16).reshape(1, m)).ravel().astype(np.float32)

dominant_eig

dominant_eig(A, iters: int = 60, seed: int = 0)

Dominant (largest-|.|) eigenvalue + eigenvector of a SYMMETRIC A by power iteration, FULLY on the ANE: x <- A x / ||A x|| unrolls into ONE program; the eigenvalue is the in-graph Rayleigh quotient (x^T A x)/(x^T x). Returns (lambda, eigenvector). A few extremal pairs follow by deflation; the FULL spectrum needs the arch-limited dense QR iteration. fp16-clean to ~1e-3 for a well-separated dominant eigenvalue.

Source code in aneforge/linalg.py
def dominant_eig(A, iters: int = 60, seed: int = 0):
    """Dominant (largest-|.|) eigenvalue + eigenvector of a SYMMETRIC A by power iteration,
    FULLY on the ANE: x <- A x / ||A x|| unrolls into ONE program; the eigenvalue is the
    in-graph Rayleigh quotient (x^T A x)/(x^T x). Returns (lambda, eigenvector). A few
    extremal pairs follow by deflation; the FULL spectrum needs the arch-limited dense QR
    iteration. fp16-clean to ~1e-3 for a well-separated dominant eigenvalue."""
    A0 = np.asarray(A, f16); n = A0.shape[0]; AT = np.ascontiguousarray(A0.T)
    onen = np.ones((n, 1), f16)
    x0 = af.input((1, n))
    nrm = lambda z: ((z * z) @ onen).sqrt()
    Ax = lambda v: v @ AT
    x = x0 / nrm(x0)
    for _ in range(iters):
        y = Ax(x); x = y / nrm(y)
    Axf = Ax(x)
    lam = ((x * Axf) @ onen) / ((x * x) @ onen)          # Rayleigh quotient [1,1]
    out = af.concat([lam, x], axis=1)                    # [1, 1+n]
    seedv = np.random.default_rng(seed).standard_normal((1, n)).astype(f16)
    r = _solve_once(out, seedv).ravel()
    return float(r[0]), r[1:].astype(np.float32)

gmres

gmres(A, b)

Solve A x = b for GENERAL (nonsymmetric) A by ONE full GMRES(n) cycle, FULLY on the ANE: Arnoldi (double modified-Gram-Schmidt reorthogonalization), the Givens rotations, and the upper-triangular back-substitution ALL unroll into one program. Each Arnoldi matvec A@q is a matmul (A folded); the projections are (q*w)@ones dots; the rotation and back-solve scalars are [1,1] tensors. More accurate than lsqr for a square general system at higher conditioning (~7e-3 at cond<=1e2), but HEAVIER: the m^2 orthogonalization plus the sequential back-substitution make a large, deep graph (~5.7k ops at n=24) that compiles slowly, so prefer lsqr unless the extra accuracy is needed. fp16 envelope ~1e-2 at cond<=1e2; SPD systems should use the cheaper conjugate_gradient.

Source code in aneforge/linalg.py
def gmres(A, b):
    """Solve A x = b for GENERAL (nonsymmetric) A by ONE full GMRES(n) cycle, FULLY on the
    ANE: Arnoldi (double modified-Gram-Schmidt reorthogonalization), the Givens rotations,
    and the upper-triangular back-substitution ALL unroll into one program. Each Arnoldi
    matvec A@q is a matmul (A folded); the projections are (q*w)@ones dots; the rotation
    and back-solve scalars are [1,1] tensors. More accurate than lsqr for a square general
    system at higher conditioning (~7e-3 at cond<=1e2), but HEAVIER: the m^2 orthogonalization
    plus the sequential back-substitution make a large, deep graph (~5.7k ops at n=24) that
    compiles slowly, so prefer lsqr unless the extra accuracy is needed. fp16 envelope ~1e-2
    at cond<=1e2; SPD systems should use the cheaper conjugate_gradient."""
    A0 = np.asarray(A, f16); n = A0.shape[0]; m = n
    AT = np.ascontiguousarray(A0.T); onen = np.ones((n, 1), f16)
    bT = af.input((1, n))
    nrm = lambda z: ((z * z) @ onen).sqrt()
    Ax = lambda v: v @ AT
    dot = lambda a, c: (a * c) @ onen
    Q = [bT / nrm(bT)]; H = {}; g = [nrm(bT)] + [None] * m; cs = [None] * m; sn = [None] * m
    for j in range(m):
        w = Ax(Q[j])
        for _p in range(2):                                  # double MGS, all in-graph
            for i in range(j + 1):
                h = dot(Q[i], w); H[(i, j)] = h if (i, j) not in H else H[(i, j)] + h; w = w - h * Q[i]
        hn = nrm(w); Hjp = hn
        for i in range(j):                                   # apply prior Givens to column j
            t = cs[i] * H[(i, j)] + sn[i] * H[(i + 1, j)]
            H[(i + 1, j)] = sn[i] * (H[(i, j)] * -1.0) + cs[i] * H[(i + 1, j)]; H[(i, j)] = t
        H[(j + 1, j)] = Hjp
        d = (H[(j, j)] * H[(j, j)] + Hjp * Hjp).sqrt(); cs[j] = H[(j, j)] / d; sn[j] = Hjp / d
        H[(j, j)] = cs[j] * H[(j, j)] + sn[j] * Hjp
        g[j + 1] = sn[j] * (g[j] * -1.0); g[j] = cs[j] * g[j]
        Q.append(w / hn)
    y = [None] * m                                           # back-substitution R y = g
    for i in range(m - 1, -1, -1):
        acc = g[i]
        for jj in range(i + 1, m):
            acc = acc - H[(i, jj)] * y[jj]
        y[i] = acc / H[(i, i)]
    x = Q[0] * 0.0
    for i in range(m):
        x = x + y[i] * Q[i]
    return _solve_once(x, np.asarray(b, f16).reshape(1, n)).ravel().astype(np.float32)

dominant_svd

dominant_svd(A, iters: int = 60, seed: int = 0)

Dominant singular triple (sigma1, u1, v1) of A by power iteration on A^T A, FULLY on the ANE: v <- A^T(A v)/||.|| unrolls into one program (the matvec is two matmuls, A and A^T folded); sigma1 = ||A v1|| as an in-graph norm, u1 = A v1 / sigma1. Returns (sigma1, u1, v1). The dominant triple only - a top-k SVD needs the subspace/ Rayleigh-Ritz step whose dense l x l decomposition is the on-ANE frontier (see randomized_svd, which keeps that small factorization on the host).

Source code in aneforge/linalg.py
def dominant_svd(A, iters: int = 60, seed: int = 0):
    """Dominant singular triple (sigma1, u1, v1) of A by power iteration on A^T A, FULLY
    on the ANE: v <- A^T(A v)/||.|| unrolls into one program (the matvec is two matmuls,
    A and A^T folded); sigma1 = ||A v1|| as an in-graph norm, u1 = A v1 / sigma1. Returns
    (sigma1, u1, v1). The dominant triple only - a top-k SVD needs the subspace/
    Rayleigh-Ritz step whose dense l x l decomposition is the on-ANE frontier (see
    randomized_svd, which keeps that small factorization on the host)."""
    A0 = np.asarray(A, f16); m, n = A0.shape
    AT = np.ascontiguousarray(A0.T); onen = np.ones((n, 1), f16); onem = np.ones((m, 1), f16)
    v0 = af.input((1, n))
    nn = lambda z: ((z * z) @ onen).sqrt()
    nm = lambda z: ((z * z) @ onem).sqrt()
    Ax = lambda v: v @ AT                                  # [1,n]@[n,m] = A v
    ATx = lambda u: u @ A0                                 # [1,m]@[m,n] = A^T u
    v = v0 / nn(v0)
    for _ in range(iters):
        # power iteration on A^T A, but NORMALIZE between the A and A^T applications:
        # A^T(A v) has magnitude ~sigma^2, which overflows fp16 for sigma>~250; keeping
        # each half-step unit-norm holds everything O(1).
        Av = Ax(v); Av = Av / nm(Av)
        v = ATx(Av); v = v / nn(v)
    Av = Ax(v); sig = nm(Av)                               # sigma1 = ||A v1||  [1,1]
    u = Av / sig
    out = af.concat([sig, u, v], axis=1)                   # [1, 1+m+n]
    seedv = np.random.default_rng(seed).standard_normal((1, n)).astype(f16)
    r = _solve_once(out, seedv).ravel()
    return float(r[0]), r[1:1 + m].astype(np.float32), r[1 + m:].astype(np.float32)

eigh

eigh(A, sweeps: int = 8, iterate: bool = False)

ALL eigenvalues of a SYMMETRIC A by fixed-sweep cyclic Jacobi on the ANE. Returns the eigenvalues sorted ascending.

iterate=False (default): the whole sweeps-sweep recurrence UNROLLS into ONE fused program. Cleanest, but small-n only - the O(n^3) rotation chain is a deep graph (caps near n=20 at ~4e-3 vs numpy.eigh, cond<=1e2).

iterate=True: compile ONE cyclic-Jacobi sweep and host-loop it sweeps times, feeding the matrix back each round. The per-sweep graph is O(n^2) instead of O(sweeps*n^2), so it reaches MUCH larger n (n~48 at ~1.2-1.8e-2). The sweep compute is on-engine; the host only shuttles the matrix between sweeps (a data move, no tensor math). Use when n exceeds the unrolled ceiling. For a few extremal pairs of a large matrix use dominant_eig.

Source code in aneforge/linalg.py
def eigh(A, sweeps: int = 8, iterate: bool = False):
    """ALL eigenvalues of a SYMMETRIC A by fixed-sweep cyclic Jacobi on the ANE. Returns the
    eigenvalues sorted ascending.

    `iterate=False` (default): the whole `sweeps`-sweep recurrence UNROLLS into ONE fused
    program. Cleanest, but small-n only - the O(n^3) rotation chain is a deep graph (caps near
    n=20 at ~4e-3 vs numpy.eigh, cond<=1e2).

    `iterate=True`: compile ONE cyclic-Jacobi sweep and host-loop it `sweeps` times,
    feeding the matrix back each round. The per-sweep graph is O(n^2) instead of O(sweeps*n^2),
    so it reaches MUCH larger n (n~48 at ~1.2-1.8e-2). The sweep compute is on-engine; the host
    only shuttles the matrix between sweeps (a data move, no tensor math). Use when n exceeds
    the unrolled ceiling. For a few extremal pairs of a large matrix use dominant_eig."""
    A0 = np.asarray(A, f16); n = A0.shape[0]
    if iterate:
        net = af.compile(_jacobi_sweep(af.input((n, n)), n), _check_precision=False)
        M = A0
        for _ in range(sweeps):
            M = net(M).astype(f16)
        net.release()
        return np.sort(np.diag(M.astype(np.float64))).astype(np.float32)
    M = af.input((n, n))
    for _ in range(sweeps):
        M = _jacobi_sweep(M, n)
    out = _solve_once(M, A0)                                                 # final ~diagonal matrix
    return np.sort(np.diag(out)).astype(np.float32)

svd

svd(A, sweeps: int = 8)

ALL singular values of A, FULLY on the ANE: form the symmetric Gram matrix A^T A as one ANE GEMM, then take its full spectrum with the on-ANE cyclic-Jacobi eigh (sigma_i = sqrt(eig_i)). Returns singular values descending. Forming A^T A SQUARES the condition number, so this is accurate for cond(A)<=1e1 in fp16; for top-k of a large / ill-conditioned A use randomized_svd. Small-n (eigh is the heavy unrolled Jacobi).

Source code in aneforge/linalg.py
def svd(A, sweeps: int = 8):
    """ALL singular values of A, FULLY on the ANE: form the symmetric Gram matrix A^T A as
    one ANE GEMM, then take its full spectrum with the on-ANE cyclic-Jacobi `eigh`
    (sigma_i = sqrt(eig_i)). Returns singular values descending. Forming A^T A SQUARES the
    condition number, so this is accurate for cond(A)<=1e1 in fp16; for top-k of a large /
    ill-conditioned A use randomized_svd. Small-n (eigh is the heavy unrolled Jacobi)."""
    A16 = np.asarray(A, f16); m, n = A16.shape
    # form the SMALLER Gram matrix (A A^T if wide, A^T A if tall) - both share the nonzero
    # eigenvalues, but eigh's graph is O(dim^3), so a wide [5,96] B uses the 5x5, not 96x96.
    M = A16 if m >= n else np.ascontiguousarray(A16.T)
    G = _ane_gemm(M, M, transpose_a=True)                  # ANE: M^T M  [min(m,n), min(m,n)]
    ev = eigh(G, sweeps=sweeps)
    return np.sqrt(np.clip(ev, 0.0, None))[::-1].astype(np.float32)

svdvals_topk

svdvals_topk(A, k: int, oversample: int = 2, power_iters: int = 1, seed: int = 0)

Top-k singular VALUES of a large A, FULLY on the ANE - randomized range finding with every step on the engine: the sketch A@Omega and the power products (matmuls), the range-basis orthonormalization (the on-ANE Gram-Schmidt qr, re-run between power steps), the projection Q^T A, and the small-block svd (on-ANE cyclic Jacobi). The host only orchestrates (array transposes/slices, no tensor math). Unlike randomized_svd - which keeps the QR + small SVD on the host - this is the fully-on- engine top-k. Returns the k largest singular values, ~1e-2 for a rank-k matrix.

NOTE the fp16 tradeoff: extra power iterations HURT here (relerr 1.2e-2 -> 8e-2 -> 0.2 as power_iters goes 1 -> 2 -> 3). Each A^T A step squares the spectrum, and in fp16 that crushes the trailing singular values toward the dominant one, so the minimal sketch (oversample 2, one power step) is the most accurate for a clean low-rank matrix.

Source code in aneforge/linalg.py
def svdvals_topk(A, k: int, oversample: int = 2, power_iters: int = 1, seed: int = 0):
    """Top-k singular VALUES of a large A, FULLY on the ANE - randomized range finding with
    every step on the engine: the sketch A@Omega and the power products (matmuls), the
    range-basis orthonormalization (the on-ANE Gram-Schmidt `qr`, re-run between power
    steps), the projection Q^T A, and the small-block `svd` (on-ANE cyclic Jacobi). The
    host only orchestrates (array transposes/slices, no tensor math). Unlike
    `randomized_svd` - which keeps the QR + small SVD on the host - this is the fully-on-
    engine top-k. Returns the k largest singular values, ~1e-2 for a rank-k matrix.

    NOTE the fp16 tradeoff: extra power iterations HURT here (relerr 1.2e-2 -> 8e-2 -> 0.2 as
    power_iters goes 1 -> 2 -> 3). Each A^T A step squares the spectrum, and in fp16 that
    crushes the trailing singular values toward the dominant one, so the minimal sketch
    (oversample 2, one power step) is the most accurate for a clean low-rank matrix."""
    A16 = np.asarray(A, f16); m, n = A16.shape; l = min(k + oversample, n)
    Om = np.random.default_rng(seed).standard_normal((n, l)).astype(f16)
    Y = _ane_gemm(A16, Om); Q = qr(Y)[0].astype(f16)                  # ANE sketch + Gram-Schmidt
    for _ in range(power_iters):
        Y = _ane_gemm(A16, _ane_gemm(A16, Q, transpose_a=True)); Q = qr(Y)[0].astype(f16)
    B = _ane_gemm(A16, Q, transpose_a=True).T                         # ANE projection [l,n]
    return np.sort(svd(B, sweeps=8))[::-1][:k]                         # ANE svd of the small block

qr

qr(A)

Thin QR A = Q R by modified Gram-Schmidt, FULLY on the ANE (one program). Q has orthonormal columns, R is upper-triangular. Each projection is an (a*b)@ones dot (wide accumulator) + an axpy; the whole orthogonalization unrolls. Returns (Q, R). Small-n.

Source code in aneforge/linalg.py
def qr(A):
    """Thin QR A = Q R by modified Gram-Schmidt, FULLY on the ANE (one program). Q has
    orthonormal columns, R is upper-triangular. Each projection is an (a*b)@ones dot (wide
    accumulator) + an axpy; the whole orthogonalization unrolls. Returns (Q, R). Small-n."""
    from aneforge._compile import compile_multi
    A16 = np.asarray(A, f16); m, n = A16.shape; onem = np.ones((m, 1), f16)
    At = af.input((m, n))
    dot = lambda a, b: (a * b).transpose([1, 0]) @ onem        # [1,m]@[m,1] = [1,1]
    Q, Rcols = [], []
    z = None
    for j in range(n):
        v = At.slice_by_size([0, j], [m, 1])
        rcol = []
        for i in range(j):
            rij = dot(Q[i], v); rcol.append(rij); v = v - rij * Q[i]
        rjj = dot(v, v).sqrt(); rcol.append(rjj)
        Q.append(v * dot(v, v).rsqrt())                        # q_j = v / ||v||
        z = rjj - rjj if z is None else z                      # exact-zero [1,1]
        Rcols.append(af.concat(rcol + [z] * (n - j - 1), axis=0))   # R column j -> [n,1]
    net = compile_multi([af.concat(Q, axis=1), af.concat(Rcols, axis=1)])
    nm = {t: nme for t, nme in net.output_ports}
    out = net(A16)
    Qv = np.asarray(out[nm[net.output_tensors[0]]], np.float32)
    Rv = np.asarray(out[nm[net.output_tensors[1]]], np.float32)
    net.release()
    return Qv, Rv

cholesky

cholesky(A)

Lower-triangular Cholesky factor L (A = L L^T) of an SPD A, UNPIVOTED, FULLY on the ANE: a fixed recurrence with each L[i,j] a [1,1] tensor, dividing by the pivot L[j,j] via real_div. Small-n. ~3e-4 vs numpy at n=8.

Source code in aneforge/linalg.py
def cholesky(A):
    """Lower-triangular Cholesky factor L (A = L L^T) of an SPD A, UNPIVOTED, FULLY on the
    ANE: a fixed recurrence with each L[i,j] a [1,1] tensor, dividing by the pivot L[j,j]
    via real_div. Small-n. ~3e-4 vs numpy at n=8."""
    A16 = np.asarray(A, f16); n = A16.shape[0]
    At = af.input((n, n)); el = _els_routed(At, n)
    z = el(0, 0) - el(0, 0); L: dict = {}
    for j in range(n):
        d = el(j, j)
        for k in range(j):
            d = d - L[(j, k)] * L[(j, k)]
        L[(j, j)] = d.relu().adds(1e-12).sqrt()                # SPD -> d>0; relu guards fp16 noise
        for i in range(j + 1, n):
            s = el(i, j)
            for k in range(j):
                s = s - L[(i, k)] * L[(j, k)]
            L[(i, j)] = s / L[(j, j)]
    return _solve_once(_grid(L, n, z), A16).astype(np.float32)

lu

lu(A)

Unpivoted LU (A = L U, L unit-lower, U upper) by Doolittle, FULLY on the ANE: each entry a [1,1] tensor, dividing by the pivot U[i,i] via real_div. Returns (L, U). Small-n; UNPIVOTED, so valid when no leading pivot underflows (well-conditioned A).

Source code in aneforge/linalg.py
def lu(A):
    """Unpivoted LU (A = L U, L unit-lower, U upper) by Doolittle, FULLY on the ANE: each
    entry a [1,1] tensor, dividing by the pivot U[i,i] via real_div. Returns (L, U).
    Small-n; UNPIVOTED, so valid when no leading pivot underflows (well-conditioned A)."""
    from aneforge._compile import compile_multi
    A16 = np.asarray(A, f16); n = A16.shape[0]
    At = af.input((n, n)); el = _els_routed(At, n)
    z = el(0, 0) - el(0, 0); one = z.adds(1.0)
    U: dict = {}; L: dict = {(i, i): one for i in range(n)}
    for i in range(n):
        for k in range(i, n):
            s = el(i, k)
            for t in range(i):
                s = s - L[(i, t)] * U[(t, k)]
            U[(i, k)] = s
        for k in range(i + 1, n):
            s = el(k, i)
            for t in range(i):
                s = s - L[(k, t)] * U[(t, i)]
            L[(k, i)] = s / U[(i, i)]
    net = compile_multi([_grid(L, n, z), _grid(U, n, z)])
    nm = {t: nme for t, nme in net.output_ports}
    out = net(A16)
    Lv = np.asarray(out[nm[net.output_tensors[0]]], np.float32)
    Uv = np.asarray(out[nm[net.output_tensors[1]]], np.float32)
    net.release()
    return Lv, Uv

lu_pivoted

lu_pivoted(A)

Partial-pivoted LU: P A = L U with row interchanges, on the ANE. Returns (P, L, U). The pivot at each column is a true on-engine argmax over the subcolumn, turned into a permutation by a one-hot (arange + greater + select) and applied as a row-swap; the Schur elimination is masked to the trailing submatrix so the stored multipliers are preserved. UNLIKE the unpivoted lu, the argmax is a netplist BRIDGE op, so the program is SEGMENTED (one graph cut per column), not a single fused program - still all on ANE silicon. Small-n; ~5e-4 vs P A reconstruction at n<=8.

Source code in aneforge/linalg.py
def lu_pivoted(A):
    """Partial-pivoted LU: P A = L U with row interchanges, on the ANE. Returns (P, L, U).
    The pivot at each column is a true on-engine `argmax` over the subcolumn, turned into a
    permutation by a one-hot (arange + greater + select) and applied as a row-swap; the
    Schur elimination is masked to the trailing submatrix so the stored multipliers are
    preserved. UNLIKE the unpivoted `lu`, the argmax is a netplist BRIDGE op, so the program
    is SEGMENTED (one graph cut per column), not a single fused program - still all on ANE
    silicon. Small-n; ~5e-4 vs P A reconstruction at n<=8."""
    A16 = np.asarray(A, f16); n = A16.shape[0]
    At = af.input((n, n)); In = af.input((n, n)); arc = af.input((n, 1))
    arcr = arc.transpose([1, 0]); M = At; P = In
    half = (arc * 0.0).adds(0.5); onerow = (arcr * 0.0).adds(1.0); zerorow = arcr * 0.0
    for k in range(n):
        ek = In.slice_by_size([0, k], [n, 1]); ekr = ek.transpose([1, 0])
        kc = (arc * 0.0).adds(float(k))
        excl = af.select(kc.greater(arc), (arc * 0.0).adds(1e4), arc * 0.0)   # exclude rows < k
        idx = ((M @ ek).abs() - excl).argmax(axis=0)                          # pivot row (bridge op)
        p1h = af.select((arc - idx).abs().greater(half), arc * 0.0, (arc * 0.0).adds(1.0))

        def swap(X):                                                          # swap rows k and pivot
            rk = ekr @ X; rp = p1h.transpose([1, 0]) @ X
            return X + ek @ (rp - rk) + p1h @ (rk - rp)
        M = swap(M); P = swap(P)
        piv = (ekr @ M) @ ek
        belowk = af.select(arc.greater(kc), (arc * 0.0).adds(1.0), arc * 0.0)  # rows > k
        Lk = ((M @ ek) / piv) * belowk
        rowk = ekr @ M
        colabove = af.select(arcr.greater((arcr * 0.0).adds(float(k))), onerow, zerorow)  # cols > k
        M = M - Lk @ (rowk * colabove)                                        # Schur (trailing only)
        M = M - ((M @ ek) * belowk) @ ekr + Lk @ ekr                          # store Lk in column k
    net = af.compile(af.concat([P, M], axis=1))                              # SegmentedModel (argmax cuts)
    o = np.asarray(net(A16, np.eye(n, dtype=f16), np.arange(n, dtype=f16).reshape(n, 1)), np.float32)
    net.release()
    Pv, Mv = o[:, :n], o[:, n:]
    return Pv, np.tril(Mv, -1) + np.eye(n, dtype=np.float32), np.triu(Mv)

generalized_eigh

generalized_eigh(A, B, sweeps: int = 8)

Eigenvalues of the generalized symmetric-definite problem A x = lambda B x (A symmetric, B SPD; LAPACK sygv), FULLY on the ANE by composing on-engine kernels: B = L L^T (cholesky), C = L^-1 A L^-T (triangular inverse + two GEMMs), then the symmetric eig of C (cyclic Jacobi eigh). Returns the generalized eigenvalues ascending. Small-n (the eigh O(n^3) graph); ~4e-4 vs the fp64 reduction at cond<=1e1.

Source code in aneforge/linalg.py
def generalized_eigh(A, B, sweeps: int = 8):
    """Eigenvalues of the generalized symmetric-definite problem A x = lambda B x (A
    symmetric, B SPD; LAPACK `sygv`), FULLY on the ANE by composing on-engine kernels:
    B = L L^T (cholesky), C = L^-1 A L^-T (triangular inverse + two GEMMs), then the symmetric
    eig of C (cyclic Jacobi `eigh`). Returns the generalized eigenvalues ascending. Small-n
    (the eigh O(n^3) graph); ~4e-4 vs the fp64 reduction at cond<=1e1."""
    A16 = np.asarray(A, f16); B16 = np.asarray(B, f16)
    L = cholesky(B16).astype(f16)                          # ANE
    Li = _trinv_lower(L).astype(f16)                       # ANE
    C = _ane_gemm(_ane_gemm(Li, A16), np.ascontiguousarray(Li.T))   # ANE: (L^-1 A) L^-T
    C = ((C + C.T) * 0.5).astype(f16)                      # symmetrize (host, tiny)
    return eigh(C, sweeps=sweeps)

eigvals

eigvals(A, iters: int = 60)

Eigenvalues of a GENERAL (nonsymmetric) real A by the unshifted QR algorithm, FULLY on the ANE as ONE fused program: iters rounds of M <- R Q where M = Q R (Gram-Schmidt QR in-graph, RQ a matmul) drive M to real Schur (quasi-triangular) form. NO pivoting, NO shifts, NO deflation, so it is pure fixed-iteration dataflow with no bridge op. The host only READS the eigenvalues off the converged form - a 1x1 diagonal block is a real eigenvalue, a 2x2 block gives a COMPLEX-conjugate pair. Returns a complex array. Small-n (the unrolled QR chain), well-separated spectra: ~2e-3 (real) / ~2e-2 (complex) vs numpy.eigvals. Unshifted QR converges linearly, so clustered spectra need more iters.

Source code in aneforge/linalg.py
def eigvals(A, iters: int = 60):
    """Eigenvalues of a GENERAL (nonsymmetric) real A by the unshifted QR algorithm, FULLY on
    the ANE as ONE fused program: `iters` rounds of M <- R Q where M = Q R (Gram-Schmidt QR
    in-graph, RQ a matmul) drive M to real Schur (quasi-triangular) form. NO pivoting, NO
    shifts, NO deflation, so it is pure fixed-iteration dataflow with no bridge op. The host
    only READS the eigenvalues off the converged form - a 1x1 diagonal block is a real
    eigenvalue, a 2x2 block gives a COMPLEX-conjugate pair. Returns a complex array. Small-n
    (the unrolled QR chain), well-separated spectra: ~2e-3 (real) / ~2e-2 (complex) vs
    numpy.eigvals. Unshifted QR converges linearly, so clustered spectra need more iters."""
    A16 = np.asarray(A, f16); n = A16.shape[0]; onen = np.ones((n, 1), f16)
    M = af.input((n, n))

    def _qr_graph(X):                                      # modified Gram-Schmidt -> (Q, R)
        Q, Rcols, z = [], [], None
        for j in range(n):
            v = X.slice_by_size([0, j], [n, 1]); rcol = []
            for i in range(j):
                rij = (Q[i] * v).transpose([1, 0]) @ onen; rcol.append(rij); v = v - rij * Q[i]
            rjj = ((v * v).transpose([1, 0]) @ onen).sqrt(); rcol.append(rjj)
            Q.append(v * ((v * v).transpose([1, 0]) @ onen).rsqrt())
            z = rjj - rjj if z is None else z
            Rcols.append(af.concat(rcol + [z] * (n - j - 1), axis=0))
        return af.concat(Q, axis=1), af.concat(Rcols, axis=1)

    for _ in range(iters):
        Q, R = _qr_graph(M); M = R @ Q                     # RQ; unrolls into one program
    Mv = _solve_once(M, A16)
    evs, i = [], 0
    while i < n:
        if i < n - 1 and abs(Mv[i + 1, i]) > 1e-2 * (abs(Mv[i, i]) + abs(Mv[i + 1, i + 1]) + 1e-9):
            b = Mv[i:i + 2, i:i + 2]; tr = b[0, 0] + b[1, 1]
            det = b[0, 0] * b[1, 1] - b[0, 1] * b[1, 0]
            s = np.sqrt(complex(tr * tr - 4 * det)); evs += [(tr + s) / 2, (tr - s) / 2]; i += 2
        else:
            evs.append(complex(Mv[i, i])); i += 1
    return np.array(evs)

FFT

fft

aneforge.fft - a real FFT for the Apple Neural Engine, built as Cooley-Tukey factored into MATMUL STAGES.

The ANE has NO complex dtype (compute is fp16 real only) and no in-graph loop or gather, but it is a matmul machine. A previous probe (tests/test_spectral_sci.py, examples/spectral_analysis.py) showed the DFT-as-a-twiddle-matrix-matmul is fp16-CLEAN to N=2048 because the matmul accumulator is wide (>=fp32) - but that is the naive O(N^2) DFT (one dense [N,N] twiddle matrix). This module keeps the "every stage is a matmul" property while cutting the op count to ~O(N*(N1+N2+...)) via the Cooley-Tukey factorization.

THE IDEA (Cooley-Tukey, one split N = N1 * N2): Index n = n1N2 + n2 (n1 in [0,N1), n2 in [0,N2)); k = k2N1 + k1. X[k1 + N1k2] = sum_{n2} [ exp(-2pi i n2 k1 / N) * ( sum_{n1} x[n1N2 + n2] * exp(-2pi i n1 k1 / N1) ) ] * exp(-2pi i n2 k2 / N2) => reshape x to [N1, N2]; (1) N1-point DFT down each column (matmul by the [N1,N1] twiddle Wn1), (2) multiply by the CROSS twiddles T[k1,n2] = exp(-2pi i k1 n2 / N), (3) N2-point DFT along each row (matmul by the [N2,N2] twiddle Wn2), (4) transpose to put the output in order X[k1, k2] -> X[k2*N1 + k1].

WHAT THIS MODULE BUILDS (and why). N is factored into AT MOST 3 balanced GROUPS, N = g0g1g2, and the signal is reshaped ONCE to a 4-D tensor [1, g0, g1, g2] that stays fully split for the whole computation. Each group is one dense-DFT matmul stage along its own axis (a single transpose to bring the axis last, the complex matmul, a single transpose back), with cross-twiddle multiplies between stages and ONE final axis-reversal transpose. Cost = N*(g0+g1+g2) MACs vs the dense DFT's N^2 - sub-quadratic, e.g. 32x fewer at N=1024, 51x at N=2048; every stage is a matmul (ANE-native).

TWO HARD ANE WALLS shape this (both found empirically on M5, see _factor / _axis_dft): - The ANE caps transpose+matmul at RANK-4 tensors (rank>=5 fails ANECCompile), so the fully-split tensor carries at most 3 factor groups -> a 3-stage FFT, not a full log-depth radix-2 cascade. - Chaining the recursive transpose->reshape->transpose COLLAPSE mis-fuses on this ANE (correct VALUES, permuted ORDER). Keeping the tensor fully split with ONE isolated transpose per stage sidesteps it. (A single transpose, or a transpose separated by a matmul, is reliable - verified.)

COMPLEX AS REAL PAIRS. Every value is a (re, im) Tensor pair. A complex matmul C = A @ B is four real matmuls: Cre = Are@Bre - Aim@Bim Cim = Are@Bim + Aim@Bre (Straight 4-matmul form, not Karatsuba: on the ANE matmul is cheap and the wide accumulator keeps the straight form cleanest - Karatsuba's (a+b)(c+d) sums lose a bit of fp16 headroom for no real op savings here.) The twiddle matrices are small fp16 CONSTANTS folded into the graph (matmul against a numpy array is a streamed weight, see graph.Tensor.matmul).

This is a submodule over the PUBLIC aneforge ops only (@ matmul, reshape, transpose, add/sub/mul, concat, square, sqrt, af.input, af.compile). It does not touch graph.py, _compile.py, init.py, _optimize.py, _paired.py or linalg.py.

API fft(x_re, x_im, N) -> (X_re, X_im) complex -> complex ifft(X_re, X_im, N) -> (x_re, x_im) inverse (1/N scaled) rfft(x_real, N) -> (X_re, X_im) real input (imag = 0) fft2(x_re, x_im) -> (X_re, X_im) 2-D transform of an [M,N] field ifft2(X_re, X_im) -> (x_re, x_im) inverse (1/(M*N) scaled) magnitude(X_re, X_im) -> |X| sqrt(re^2 + im^2) power(X_re, X_im) -> |X|^2

Each returns numpy arrays; under the hood it builds an aneforge graph, compiles it to ONE fused e5rt program, and runs it on the ANE. A Plan object (fft_plan / rfft_plan) compiles once and runs many times.

PYTHONPATH=. python3 aneforge/fft.py

Plan

A compiled staged-FFT program. Holds the e5rt Model plus the cross-twiddle constant arrays, which it threads in automatically on every call.

Source code in aneforge/fft.py
class Plan:
    """A compiled staged-FFT program. Holds the e5rt Model plus the cross-twiddle
    constant arrays, which it threads in automatically on every call."""

    def __init__(self, N: int, inverse: bool, real_input: bool):
        self.N = N
        self.inverse = inverse
        self.real_input = real_input
        b = _Builder(inverse)
        # two user inputs: real and imag parts of the signal. For rfft the imag input is
        # fed as zeros (kept as a declared input so the graph stays a pure function).
        xr = af.input((1, N)); xi = af.input((1, N))
        Xr, Xi = b.transform(xr, xi, N)
        if inverse:
            Xr = Xr * (1.0 / N)
            Xi = Xi * (1.0 / N)
        # one fused program with a single output: concat(re, im) -> [1, 2N], split on host
        out = af.concat([Xr, Xi], axis=1)
        self._aux_values = b.aux_values
        self.n_stages = _stage_count(N)          # number of dense-DFT matmul stages (<=3)
        # The DFT butterfly has exact-by-construction subtracts that trip the generic
        # cancel_sub precision heuristic; this kernel is numerically verified (spectrum
        # matches np.fft to fp16), so it vouches for itself and skips the check.
        self.model = af.compile(out, _check_precision=False)
        self.n_ops = self.model.n_ops

    def __call__(self, x_re: np.ndarray, x_im: np.ndarray | None = None):
        x_re = np.asarray(x_re, np.float16).reshape(1, self.N)
        if x_im is None:
            x_im = np.zeros((1, self.N), np.float16)
        else:
            x_im = np.asarray(x_im, np.float16).reshape(1, self.N)
        out = self.model(x_re, x_im, *self._aux_values)
        out = out.reshape(2, self.N)
        return out[0].copy(), out[1].copy()

Plan2

A compiled 2-D FFT program for [M,N] complex fields - ONE fused e5rt program.

The 2-D DFT is separable: X_hat = F_M @ X @ F_N^T. Applied to a whole matrix, each axis transform is a single complex matmul (a DFT of every row at once), NOT a per-row loop: rows transform as one matmul against the [N,N] twiddle, columns as transpose -> matmul against the [M,M] twiddle -> transpose back. Eight real GEMMs total, fused into one program - vs M+N dispatches for host-looping the 1-D plan.

The DENSE per-axis form, O(MN(M+N)) MACs (not the staged sub-quadratic 1-D build): at PDE-scale fields the ANE eats the GEMMs and dispatch dominates, and a per-axis staged split of a 2-D field would exceed the rank-4 transpose+matmul cap. Each transpose sits between matmuls (the safe pattern; only transpose->reshape CHAINS mis-fuse - see _axis_dft).

Source code in aneforge/fft.py
class Plan2:
    """A compiled 2-D FFT program for [M,N] complex fields - ONE fused e5rt program.

    The 2-D DFT is separable:  X_hat = F_M @ X @ F_N^T.  Applied to a whole matrix,
    each axis transform is a single complex matmul (a DFT of every row at once), NOT a
    per-row loop: rows transform as one matmul against the [N,N] twiddle, columns as
    transpose -> matmul against the [M,M] twiddle -> transpose back. Eight real GEMMs
    total, fused into one program - vs M+N dispatches for host-looping the 1-D plan.

    The DENSE per-axis form, O(M*N*(M+N)) MACs (not the staged sub-quadratic 1-D build):
    at PDE-scale fields the ANE eats the GEMMs and dispatch dominates, and a per-axis
    staged split of a 2-D field would exceed the rank-4 transpose+matmul cap. Each
    transpose sits between matmuls (the safe pattern; only transpose->reshape CHAINS
    mis-fuse - see _axis_dft)."""

    def __init__(self, M: int, N: int, inverse: bool):
        self.M, self.N, self.inverse = M, N, inverse
        mk = _idft_matrix if inverse else _dft_matrix
        WrN, WiN = mk(N)                                          # row twiddle (x @ Wt)
        WrM, WiM = mk(M)                                          # column twiddle
        if inverse:
            # Fold the 1/(M*N) normalization INTO the twiddles, 1/N on the row pass and
            # 1/M on the column pass - NOT one scale at the end. A real spectrum is large
            # (O(M*N) at the dominant modes), so an unscaled first-axis transform would
            # push intermediates past fp16 max (65504) and shred precision.
            WrN, WiN = WrN * (1.0 / N), WiN * (1.0 / N)
            WrM, WiM = WrM * (1.0 / M), WiM * (1.0 / M)
        xr = af.input((M, N)); xi = af.input((M, N))
        re, im = _cmatmul_const(xr, xi, WrN, WiN)                 # all M rows, one matmul
        re = re.transpose([1, 0]); im = im.transpose([1, 0])      # columns -> rows
        re, im = _cmatmul_const(re, im, WrM, WiM)                 # all N columns, one matmul
        re = re.transpose([1, 0]); im = im.transpose([1, 0])
        out = af.concat([re, im], axis=0)                          # [2M, N], split on host
        # exact-by-construction DFT subtracts trip the generic cancel_sub heuristic;
        # numerically verified vs np.fft.fft2 (tests/test_fft2.py), so it vouches for itself.
        self.model = af.compile(out, _check_precision=False)
        self.n_ops = self.model.n_ops

    def __call__(self, x_re: np.ndarray, x_im: np.ndarray | None = None):
        x_re = np.asarray(x_re, np.float16).reshape(self.M, self.N)
        if x_im is None:
            x_im = np.zeros((self.M, self.N), np.float16)
        else:
            x_im = np.asarray(x_im, np.float16).reshape(self.M, self.N)
        out = self.model(x_re, x_im).reshape(2, self.M, self.N)
        return out[0].copy(), out[1].copy()

fft

fft(x_re, x_im, N: int)

Forward FFT of a complex signal (real/imag arrays), length N, on the ANE. Returns (X_re, X_im) numpy arrays of length N.

Source code in aneforge/fft.py
def fft(x_re, x_im, N: int):
    """Forward FFT of a complex signal (real/imag arrays), length N, on the ANE.
    Returns (X_re, X_im) numpy arrays of length N."""
    return fft_plan(N)(x_re, x_im)

ifft

ifft(X_re, X_im, N: int)

Inverse FFT (1/N normalized) of a complex spectrum on the ANE. Returns (x_re, x_im).

Source code in aneforge/fft.py
def ifft(X_re, X_im, N: int):
    """Inverse FFT (1/N normalized) of a complex spectrum on the ANE.
    Returns (x_re, x_im)."""
    return ifft_plan(N)(X_re, X_im)

rfft

rfft(x_real, N: int)

Forward FFT of a REAL signal (imag = 0) on the ANE. Returns the full-length complex spectrum (X_re, X_im); the upper half is the conjugate mirror.

Source code in aneforge/fft.py
def rfft(x_real, N: int):
    """Forward FFT of a REAL signal (imag = 0) on the ANE. Returns the full-length
    complex spectrum (X_re, X_im); the upper half is the conjugate mirror."""
    return rfft_plan(N)(x_real, None)

fft2

fft2(x_re, x_im=None)

2-D FFT of an [M,N] complex field on the ANE as ONE fused program (F_M @ X @ F_N^T as eight real GEMMs). x_im=None means a real field. Returns (X_re, X_im) [M,N] numpy arrays, np.fft.fft2 convention.

Source code in aneforge/fft.py
def fft2(x_re, x_im=None):
    """2-D FFT of an [M,N] complex field on the ANE as ONE fused program
    (F_M @ X @ F_N^T as eight real GEMMs). x_im=None means a real field.
    Returns (X_re, X_im) [M,N] numpy arrays, np.fft.fft2 convention."""
    x_re = np.asarray(x_re)
    M, N = x_re.shape
    return fft2_plan(M, N)(x_re, x_im)

ifft2

ifft2(X_re, X_im)

Inverse 2-D FFT (1/(M*N) normalized) on the ANE, one fused program. Returns (x_re, x_im), np.fft.ifft2 convention.

Source code in aneforge/fft.py
def ifft2(X_re, X_im):
    """Inverse 2-D FFT (1/(M*N) normalized) on the ANE, one fused program.
    Returns (x_re, x_im), np.fft.ifft2 convention."""
    X_re = np.asarray(X_re)
    M, N = X_re.shape
    return ifft2_plan(M, N)(X_re, X_im)

magnitude

magnitude(X_re, X_im)

|X| = sqrt(re^2 + im^2) (numpy on host outputs).

Source code in aneforge/fft.py
def magnitude(X_re, X_im):
    """|X| = sqrt(re^2 + im^2) (numpy on host outputs)."""
    return np.sqrt(np.asarray(X_re, np.float32) ** 2 + np.asarray(X_im, np.float32) ** 2)

power

power(X_re, X_im)

|X|^2 power spectrum (numpy on host outputs).

Source code in aneforge/fft.py
def power(X_re, X_im):
    """|X|^2 power spectrum (numpy on host outputs)."""
    return np.asarray(X_re, np.float32) ** 2 + np.asarray(X_im, np.float32) ** 2

Special functions

special

aneforge.special - special functions evaluated on the Apple Neural Engine as fused fp16 polynomial chains.

The ANE's strength is a long, dependent chain of fused fp16 mul/add (one e5rt program, no graph cut). Special functions are exactly that: Horner / Clenshaw evaluation of minimax or rational approximations, plus smooth range reduction. This module builds each function as an aneforge graph out of the public ops (+ - * /, exp/log/sqrt/sin/cos, clip) only, so the whole thing compiles into one fused ANE program.

from aneforge.special import erfc, gamma, lgamma, expm1, log1p, bessel_j0
import aneforge as af, numpy as np

x   = af.input((1, 64))
net = af.compile(erfc(x))        # one fused ANE program
y   = net(np.linspace(0, 4, 64).astype(np.float16).reshape(1, 64))

Every function takes and returns an aneforge.Tensor; you compile and run the result yourself (so several can share inputs / fuse together).

WHY THESE: they add value beyond the native unaries (exp/log/erf/...) which either (a) do not exist (gamma, lgamma, erfc, expm1, log1p, Bessel) or (b) degrade / cancel in fp16 over a wide range (erfc = 1-erf cancels to 0 past x~2; exp loses a digit past |x|~8). See the __main__ self-test for measured relerr vs scipy and the honest fp16 verdict per function.

THE TWO CONSTRAINTS that shape the implementations
  • fp16 compute. The ANE computes in fp16 (~3-4 significant digits). A poly exact in fp64 is capped at ~1e-3..1e-4 relerr once rounded. Where a function's output leaves the fp16 range (gamma > 65504 past x~8; I0 past x~12) that is a hard wall, not a coefficient problem.
  • no scalar add and no in-graph branch. The graph has * scalar (muls) but no + scalar and no data-dependent control flow. We add a constant c with _const(x, c) (a cos(0)=1 ones tensor times c), and use only fixed, smooth range reduction - never a per-element branch / variable step count. Functions needing a data-dependent reduction (gamma far outside [1,2], lgamma reflection for x<0) are scoped to the range a single static graph can cover, documented per function.

sin

sin(x: Tensor) -> Tensor

sin(x) for x in [-pi/2, pi/2] as a fused fp16 polynomial (x * P(x^2)).

Portable trig: only mul/sub/exp, so it runs on ANE families lacking the native sin op (A15+), M1/H13 included. Outside [-pi/2, pi/2] reduce the argument on the host first (the static graph has no data-dependent range reduction).

Source code in aneforge/special.py
def sin(x: Tensor) -> Tensor:
    """`sin(x)` for x in [-pi/2, pi/2] as a fused fp16 polynomial (`x * P(x^2)`).

    Portable trig: only mul/sub/exp, so it runs on ANE families lacking the native
    sin op (A15+), M1/H13 included. Outside [-pi/2, pi/2] reduce the argument on the
    host first (the static graph has no data-dependent range reduction)."""
    return x * _poly_in(x * x, _SIN_P)

cos

cos(x: Tensor) -> Tensor

cos(x) for x in [-pi/2, pi/2] as a fused fp16 polynomial (Q(x^2)).

Portable companion to sin above - native cos is A15+, this runs everywhere M1 included. Reduce arguments outside [-pi/2, pi/2] on the host.

Source code in aneforge/special.py
def cos(x: Tensor) -> Tensor:
    """`cos(x)` for x in [-pi/2, pi/2] as a fused fp16 polynomial (`Q(x^2)`).

    Portable companion to `sin` above - native cos is A15+, this runs everywhere
    M1 included. Reduce arguments outside [-pi/2, pi/2] on the host."""
    return _poly_in(x * x, _COS_Q)

erfc

erfc(x: Tensor) -> Tensor

Complementary error function erfc(x) = 1 - erf(x) for x >= 0.

Direct rational*exp form (A&S 7.1.26) - does NOT cancel for large x, unlike 1 - native_erf. Valid for x in [0, ~6] (erfc decays smoothly; the exp underflows to 0 past x~6, the correct limit). For x<0 use the identity erfc(-x)=2-erfc(x) on the host (a branch the static graph can't do).

Source code in aneforge/special.py
def erfc(x: Tensor) -> Tensor:
    """Complementary error function `erfc(x) = 1 - erf(x)` for `x >= 0`.

    Direct rational*exp form (A&S 7.1.26) - does NOT cancel for large x, unlike
    `1 - native_erf`. Valid for x in [0, ~6] (erfc decays smoothly; the exp
    underflows to 0 past x~6, the correct limit). For x<0 use the identity
    erfc(-x)=2-erfc(x) on the host (a branch the static graph can't do).
    """
    t = _const(x, 1.0) / (_const(x, 1.0) + x * _ERFC_P)
    # poly(t) = (((a4 t + a3) t + a2) t + a1) t + a0) * t   -> a0..a4 low-first, then *t
    poly = _poly_in(t, _ERFC_A) * t
    return poly * (x * x * -1.0).exp()

expm1

expm1(x: Tensor) -> Tensor

exp(x) - 1 accurate near 0 (where exp(x)-1 cancels). Taylor x(1 + x/2 + x^2/6 + ... + x^5/720) - valid for |x| <= ~0.7. Outside that, use x.exp() and subtract 1 with _const (no cancellation there).

Source code in aneforge/special.py
def expm1(x: Tensor) -> Tensor:
    """`exp(x) - 1` accurate near 0 (where `exp(x)-1` cancels). Taylor
    `x(1 + x/2 + x^2/6 + ... + x^5/720)` - valid for |x| <= ~0.7. Outside that,
    use `x.exp()` and subtract 1 with `_const` (no cancellation there)."""
    # x * (1 + x(1/2 + x(1/6 + x(1/24 + x(1/120 + x/720)))))
    inner = _horner(x, [1.0 / 720, 1.0 / 120, 1.0 / 24, 1.0 / 6, 0.5, 1.0])
    return x * inner

log1p

log1p(x: Tensor) -> Tensor

log(1 + x) accurate near 0 (where log(1+x) cancels). Evaluated as x * poly(x) with a deg-7 minimax of log1p(x)/x - valid for x in [-0.5, 1.0]. The x* factor keeps it exact at 0.

Source code in aneforge/special.py
def log1p(x: Tensor) -> Tensor:
    """`log(1 + x)` accurate near 0 (where `log(1+x)` cancels). Evaluated as
    `x * poly(x)` with a deg-7 minimax of `log1p(x)/x` - valid for x in
    [-0.5, 1.0]. The `x*` factor keeps it exact at 0."""
    return x * _horner(x, _LOG1P_RATIO)

lgamma

lgamma(x: Tensor) -> Tensor

Log-gamma log|Gamma(x)| for x in [1, 8] via a deg-8 minimax in the centered variable (x-4.5). Accurate in ABSOLUTE terms (~3e-3); relative error is large only at the zeros x=1, x=2 where lgamma -> 0 (relative error is ill-defined there).

Source code in aneforge/special.py
def lgamma(x: Tensor) -> Tensor:
    """Log-gamma `log|Gamma(x)|` for x in [1, 8] via a deg-8 minimax in the
    centered variable (x-4.5). Accurate in ABSOLUTE terms (~3e-3); relative error
    is large only at the zeros x=1, x=2 where lgamma -> 0 (relative error is
    ill-defined there)."""
    return _horner(x + _const(x, -_LGAMMA_C), _LGAMMA)

gamma

gamma(x: Tensor) -> Tensor

Gamma function on x in [1, 2] via a deg-6 minimax in the centered variable (x-1.5).

SCOPE / HONESTY: gamma grows super-exponentially and overflows fp16 (>65504) past x~8.3, so it is fundamentally fp16-narrow. Extending [1,2] to a wider window needs the recurrence Gamma(x+1)=x*Gamma(x) applied a data-dependent number of times - not expressible in one static graph. For a wider but still-fp16-bounded range use gamma_via_lgamma (x in [1, ~7.5]), which routes through exp(lgamma(x)) instead.

Source code in aneforge/special.py
def gamma(x: Tensor) -> Tensor:
    """Gamma function on x in [1, 2] via a deg-6 minimax in the centered variable
    (x-1.5).

    SCOPE / HONESTY: gamma grows super-exponentially and overflows fp16 (>65504)
    past x~8.3, so it is fundamentally fp16-narrow. Extending [1,2] to a wider
    window needs the recurrence Gamma(x+1)=x*Gamma(x) applied a data-dependent
    number of times - not expressible in one static graph. For a wider but
    still-fp16-bounded range use `gamma_via_lgamma` (x in [1, ~7.5]), which routes
    through `exp(lgamma(x))` instead.
    """
    return _horner(x + _const(x, -_GAMMA_C), _GAMMA_12)

gamma_via_lgamma

gamma_via_lgamma(x: Tensor) -> Tensor

Gamma(x) = exp(lgamma(x)) for x in [1, ~7.5] (output stays under the fp16 max). Wider range than gamma at a small accuracy cost (the exp re-rounds).

Source code in aneforge/special.py
def gamma_via_lgamma(x: Tensor) -> Tensor:
    """`Gamma(x) = exp(lgamma(x))` for x in [1, ~7.5] (output stays under the
    fp16 max). Wider range than `gamma` at a small accuracy cost (the exp
    re-rounds)."""
    return lgamma(x).exp()

bessel_j0

bessel_j0(x: Tensor) -> Tensor

Bessel J0(x) for |x| <= 3 (A&S 9.4.1 polynomial in (x/3)^2). Smooth, bounded; the dominant first lobe and first zero (x~2.405) are in range.

Source code in aneforge/special.py
def bessel_j0(x: Tensor) -> Tensor:
    """Bessel J0(x) for |x| <= 3 (A&S 9.4.1 polynomial in (x/3)^2). Smooth,
    bounded; the dominant first lobe and first zero (x~2.405) are in range."""
    t = (x * x) * (1.0 / 9.0)
    return _poly_in(t, _J0)

bessel_i0

bessel_i0(x: Tensor) -> Tensor

Modified Bessel I0(x) for |x| <= 3.75 (A&S 9.8.1 in (x/3.75)^2). I0 grows exponentially and overflows fp16 past x~12, so this small-argument branch is the fp16-useful range.

Source code in aneforge/special.py
def bessel_i0(x: Tensor) -> Tensor:
    """Modified Bessel I0(x) for |x| <= 3.75 (A&S 9.8.1 in (x/3.75)^2). I0 grows
    exponentially and overflows fp16 past x~12, so this small-argument branch is
    the fp16-useful range."""
    t = (x * x) * (1.0 / (3.75 * 3.75))
    return _poly_in(t, _I0)

bessel_k0

bessel_k0(x: Tensor) -> Tensor

Modified Bessel K0(x) for 0 < x <= 2 (A&S 9.8.5): K0(x) = -ln(x/2) I0(x) + series((x/2)^2). K0 has a -ln singularity at 0, so x must be strictly positive (and not tiny: log underflow). I0 here reuses the 9.8.1 polynomial (in (x/3.75)^2), valid through x=2; the K0 series part is in (x/2)^2 -- the two use DIFFERENT scaled arguments.

Source code in aneforge/special.py
def bessel_k0(x: Tensor) -> Tensor:
    """Modified Bessel K0(x) for 0 < x <= 2 (A&S 9.8.5):
    `K0(x) = -ln(x/2) I0(x) + series((x/2)^2)`. K0 has a -ln singularity at 0,
    so x must be strictly positive (and not tiny: log underflow). I0 here reuses
    the 9.8.1 polynomial (in (x/3.75)^2), valid through x=2; the K0 series part is
    in (x/2)^2 -- the two use DIFFERENT scaled arguments."""
    half = x * 0.5
    t_k0 = half * half                       # (x/2)^2  for the K0 series
    t_i0 = (x * x) * (1.0 / (3.75 * 3.75))    # (x/3.75)^2 for the I0 factor
    i0 = _poly_in(t_i0, _I0)
    return half.log() * (i0 * -1.0) + _poly_in(t_k0, _K0)

exp_wide

exp_wide(x: Tensor, splits: int = 1) -> Tensor

exp for wide x via argument splitting: exp(x) = (exp(x / 2^splits)) ^ (2^splits) by repeated squaring (the inner exp runs on a smaller argument).

HONEST FINDING (measured on this M5 ANE): this does NOT reliably beat the native x.exp(). The native fp16 exp is already accurate (median per-point relerr ~1.4e-3 over [-10,10]); the squaring re-rounding usually costs MORE than the small-argument benefit gains, so per-point accuracy is the same or slightly worse. Provided for completeness. fp16's output range (max 65504) caps exp at x~11 regardless of method - splitting can never extend the range, only (at best marginally) trade accuracy.

Source code in aneforge/special.py
def exp_wide(x: Tensor, splits: int = 1) -> Tensor:
    """exp for wide `x` via argument splitting:
    `exp(x) = (exp(x / 2^splits)) ^ (2^splits)` by repeated squaring (the inner
    `exp` runs on a smaller argument).

    HONEST FINDING (measured on this M5 ANE): this does NOT reliably beat the
    native `x.exp()`. The native fp16 exp is already accurate (median per-point
    relerr ~1.4e-3 over [-10,10]); the squaring re-rounding usually costs MORE
    than the small-argument benefit gains, so per-point accuracy is the same or
    slightly worse. Provided for completeness. fp16's output range (max 65504)
    caps exp at x~11 regardless of method - splitting can never extend the range,
    only (at best marginally) trade accuracy.
    """
    e = (x * (1.0 / (1 << splits))).exp()
    for _ in range(splits):
        e = e * e
    return e

log_wide

log_wide(x: Tensor, sqrts: int = 3) -> Tensor

log for wide positive x via log(x) = 2^sqrts * log(x^(1/2^sqrts)). Repeated square roots pull a large argument toward 1, where log is most accurate, then scale back.

HONEST FINDING: native fp16 log is already good (~1e-3 over [1e-2, 1e4]) and this matches rather than clearly beats it - the sqrt chain re-rounds. Useful mainly as a documented range-reduction recipe; the native x.log() is the better default on this hardware.

Source code in aneforge/special.py
def log_wide(x: Tensor, sqrts: int = 3) -> Tensor:
    """log for wide positive `x` via `log(x) = 2^sqrts * log(x^(1/2^sqrts))`.
    Repeated square roots pull a large argument toward 1, where log is most
    accurate, then scale back.

    HONEST FINDING: native fp16 log is already good (~1e-3 over [1e-2, 1e4]) and
    this matches rather than clearly beats it - the sqrt chain re-rounds. Useful
    mainly as a documented range-reduction recipe; the native `x.log()` is the
    better default on this hardware."""
    r = x
    for _ in range(sqrts):
        r = r.sqrt()
    return r.log() * float(1 << sqrts)

einsum

einsum

aneforge.einsum - a general einsum(equation, *operands) -> Tensor that decomposes a numpy-style subscript equation into a sequence of aneforge graph ops (transpose / reshape / matmul-bmm / broadcast-mul / reduce-sum) so the whole contraction runs as ONE fused e5rt program on the Apple Neural Engine.

import aneforge as af

a = af.input((32, 64)); b = af.input((64, 16))
c = af.einsum("ij,jk->ik", a, b)     # a plain matmul, on the ANE
net = af.compile(c); out = net(A, B)

af.einsum IS this function (the package rebinds the name to it), so it is callable directly; from aneforge.einsum import einsum works identically.

WHY THIS IS POSSIBLE (and where it stops)

A two-operand einsum A,B->C partitions its indices into four classes: * BATCH (B): appear in A, B and C -> kept as outer/batch dims * CONTRACT (K): appear in A and B, not C -> summed over (the dot dimension) * KEEP-A (M): appear in A and C, not B -> rows of the output * KEEP-B (N): appear in B and C, not A -> cols of the output Align each operand to [B..., M..., K...] / [B..., K..., N...] by transpose, collapse each group to a single axis by reshape, then a batched matmul [B,M,K] @ [B,K,N] -> [B,M,N] does the contract-and-sum in ONE op (the ANE matmul accumulator is wide, >=fp32, so the K-sum is clean). Reshape the result back to the named output dims, then transpose to the requested order.

Indices in ONE operand but NOT in the output are summed FIRST with a reduce (af.sum); output-only indices absent from the inputs are unsupported (einsum cannot invent a dimension). For >2 operands we left-fold: contract the running result with the next operand pairwise.

REDUCTION-ONLY and OUTER products are handled directly
  • ij->i / ij-> : pure reduce (sum over the dropped axes).
  • i,j->ij : broadcast-mul (no contraction), an outer product.

WHAT IS NOT SUPPORTED (rejected with a clear error, never wrong results)

Any equation with a REPEATED index inside a single operand - diagonal / trace extraction (ii->i, ii->, ...ii->...). Reading a tensor's diagonal is a GATHER, unsupported on the ANE (per the op-conformance finding), with no matmul/reduce decomposition. These raise EinsumUnsupported. Ellipsis (...) is also not implemented (v1).

Run the self-test (validates a battery vs numpy.einsum on the ANE): PYTHONPATH=. python3 aneforge/einsum.py

EinsumUnsupported

Bases: NotImplementedError

Raised for einsum equations that cannot be lowered to ANE matmul/reduce ops (diagonal/trace via repeated indices, ellipsis, invented output dims).

Source code in aneforge/einsum.py
class EinsumUnsupported(NotImplementedError):
    """Raised for einsum equations that cannot be lowered to ANE matmul/reduce ops
    (diagonal/trace via repeated indices, ellipsis, invented output dims)."""

einsum

einsum(equation: str, *operands: Tensor) -> Tensor

numpy-style einsum lowered to aneforge ops (matmul-reducible patterns).

Supports: matmul (ij,jk->ik), batched matmul (bij,bjk->bik), transpose (ij->ji), outer product (i,j->ij), elementwise+reduce (ij,ij->i, ij->i, ij->), bilinear (bi,ij,bj->b), attention scores (bhqd,bhkd->bhqk), and general >2-operand left-fold contractions.

Rejects (EinsumUnsupported): diagonal/trace via repeated indices in one operand (ii->i, ii->), repeated output indices, and ellipsis.

Source code in aneforge/einsum.py
def einsum(equation: str, *operands: Tensor) -> Tensor:
    """numpy-style `einsum` lowered to aneforge ops (matmul-reducible patterns).

    Supports: matmul (`ij,jk->ik`), batched matmul (`bij,bjk->bik`), transpose
    (`ij->ji`), outer product (`i,j->ij`), elementwise+reduce (`ij,ij->i`,
    `ij->i`, `ij->`), bilinear (`bi,ij,bj->b`), attention scores
    (`bhqd,bhkd->bhqk`), and general >2-operand left-fold contractions.

    Rejects (`EinsumUnsupported`): diagonal/trace via repeated indices in one
    operand (`ii->i`, `ii->`), repeated output indices, and ellipsis."""
    if not operands:
        raise ValueError("einsum: at least one operand is required")
    if not all(isinstance(o, Tensor) for o in operands):
        raise TypeError("einsum: all operands must be aneforge Tensors (build them with af.input)")

    ins, out = _parse(equation, len(operands))
    for s, t in zip(ins, operands):
        if len(s) != len(t.shape):
            raise ValueError(f"einsum: subscript '{s}' has {len(s)} indices but operand "
                             f"shape {t.shape} has {len(t.shape)} dims")
        _reduce_repeats_check(s)

    # ---- single operand: pure transpose / reduce ------------------------- #
    if len(operands) == 1:
        t, sub = operands[0], ins[0]
        t, sub = _sum_to(t, sub, out)            # drop summed indices
        if sub == out:
            return t
        return _align(t, sub, out)               # transpose to requested order

    # ---- multi-operand: left-fold pairwise ------------------------------- #
    cur, csub = operands[0], ins[0]
    for i in range(1, len(operands)):
        nxt, nsub = operands[i], ins[i]
        remaining = "".join(ins[i + 1:])
        # indices we must keep after this pair: anything in the final output OR
        # still needed by a later operand.
        survive = set(out) | set(remaining)
        step_out = "".join(dict.fromkeys(
            [ch for ch in csub + nsub if ch in survive]))
        cur, csub = _contract_pair(cur, csub, nxt, nsub, step_out)

    # final: sum away any leftover non-output indices, then order to `out`
    cur, csub = _sum_to(cur, csub, out)
    if csub != out:
        cur = _align(cur, csub, out)
    return cur

Signal processing

dsp

aneforge.dsp - a digital-signal-processing toolkit on the Apple Neural Engine.

Built on the two things the ANE is genuinely good at: REAL CONVOLUTION (the conv layer) and the MATMUL-FFT from aneforge.fft (the DFT factored into matmul stages, complex carried as real/imag PAIRS). This submodule composes them into the everyday DSP kit:

from aneforge.dsp import (fir_filter, fft_convolve, freq_filter,
                         stft, spectrogram, hann, hamming, blackman,
                         correlate, autocorrelate)
WHAT FITS THE ANE
  • FIR filtering - a finite impulse response IS a convolution. Short kernels run on the native conv layer (1xK, arch-capped at K<=15 on this M5, verified); longer ones fall back to FFT convolution. Feed-forward, no recurrence: a clean fit.
  • FFT-domain DSP - fft_convolve / freq_filter / stft / spectrogram are all (windowed) FFTs + elementwise spectral ops + iFFT, every stage a matmul or an elementwise map (real/imag pairs via aneforge.fft). A clean fit.
  • Correlation - cross/auto-correlation is convolution without the kernel flip, exactly what the ANE conv and the native CrossCorrelation bridge already do.

WHAT DOES NOT FIT (arch limit): * IIR / recursive filters (Butterworth, biquads, any y[n] = ... + a*y[n-1]). A direct-form IIR is a DATA-DEPENDENT RECURRENCE over samples - the scan/cumsum wall the ANE lacks (no in-graph loop, no scalar feedback). No streaming IIR here. iir_filter is ONLY a FIXED-LENGTH UNROLL: the recurrence becomes its truncated FIR impulse response of a chosen length, run as an FIR. Exact only up to the truncation, and the unroll is fixed at build time - it does NOT scale to arbitrary-length streaming IIR. Exposed tagged, with the relerr-vs-untruncated cost reported, rather than faking a recurrent filter.

CONVENTIONS (inherited): * The ANE conv is a CROSS-CORRELATION (no kernel flip), matching np.correlate(x, h, 'valid'). True convolution (np.convolve / scipy lfilter) flips the kernel - fir_filter handles the flip + causal zero-pad internally. * Complex is (re, im) real-tensor pairs; spectra come back as separate real arrays. * Everything is fp16 on-device; the matmul accumulator is wide (>=fp32), so the error floor is fp16 INPUT/twiddle rounding (~few e-4), flat in length.

PYTHONPATH=. python3 aneforge/dsp.py

hann

hann(M: int, sym: bool = False) -> np.ndarray

Hann window (raised cosine). sym=False gives the periodic/DFT window (the STFT default, matching scipy.signal.get_window('hann', M)).

Source code in aneforge/dsp.py
def hann(M: int, sym: bool = False) -> np.ndarray:
    """Hann window (raised cosine). `sym=False` gives the periodic/DFT window
    (the STFT default, matching scipy.signal.get_window('hann', M))."""
    if M == 1:
        return np.ones(1, np.float32)
    n = np.arange(M)
    denom = (M - 1) if sym else M
    return (0.5 - 0.5 * np.cos(2.0 * np.pi * n / denom)).astype(np.float32)

hamming

hamming(M: int, sym: bool = False) -> np.ndarray

Hamming window. sym=False = periodic (STFT) form.

Source code in aneforge/dsp.py
def hamming(M: int, sym: bool = False) -> np.ndarray:
    """Hamming window. `sym=False` = periodic (STFT) form."""
    if M == 1:
        return np.ones(1, np.float32)
    n = np.arange(M)
    denom = (M - 1) if sym else M
    return (0.54 - 0.46 * np.cos(2.0 * np.pi * n / denom)).astype(np.float32)

blackman

blackman(M: int, sym: bool = False) -> np.ndarray

Blackman window. sym=False = periodic (STFT) form.

Source code in aneforge/dsp.py
def blackman(M: int, sym: bool = False) -> np.ndarray:
    """Blackman window. `sym=False` = periodic (STFT) form."""
    if M == 1:
        return np.ones(1, np.float32)
    n = np.arange(M)
    denom = (M - 1) if sym else M
    a0, a1, a2 = 0.42, 0.5, 0.08
    return (a0 - a1 * np.cos(2.0 * np.pi * n / denom)
            + a2 * np.cos(4.0 * np.pi * n / denom)).astype(np.float32)

get_window

get_window(window, M: int) -> np.ndarray

Resolve window (name str, 'boxcar'/None for rectangular, or an array) to a length-M fp32 coefficient vector (periodic form for the named cosine windows).

Source code in aneforge/dsp.py
def get_window(window, M: int) -> np.ndarray:
    """Resolve `window` (name str, 'boxcar'/None for rectangular, or an array) to a
    length-M fp32 coefficient vector (periodic form for the named cosine windows)."""
    if window is None or window == "boxcar" or window == "rect":
        return np.ones(M, np.float32)
    if isinstance(window, str):
        if window not in _WINDOWS:
            raise ValueError(f"unknown window {window!r}; choose from {list(_WINDOWS)} or 'boxcar'")
        return _WINDOWS[window](M, sym=False)
    w = np.asarray(window, np.float32)
    if w.shape != (M,):
        raise ValueError(f"window array must have length {M}; got {w.shape}")
    return w

fir_filter

fir_filter(x, taps, mode: str = 'same')

FIR filter y = x * taps (true convolution), on the ANE.

A finite impulse response is a convolution. The ANE conv is a CROSS-correlation (no kernel flip), so we flip the taps and zero-pad to realize a genuine convolution / causal FIR:

  • mode='full' -> length L+K-1 (== np.convolve(x, taps))
  • mode='same' -> length L, centered (== np.convolve(..., 'same'))
  • mode='valid' -> length L-K+1 (== np.convolve(..., 'valid'))
  • mode='lfilter' -> length L, causal (== scipy.signal.lfilter(taps, [1.0], x))

Short kernels (K<=15) run on the native conv layer in ONE fused program; longer kernels fall back to FFT convolution (aneforge.fft) automatically - same result.

cost: COMPUTE (a real conv) for short taps; FFT-domain for long taps. fit: GOOD - feed-forward, no recurrence.

Source code in aneforge/dsp.py
def fir_filter(x, taps, mode: str = "same"):
    """FIR filter `y = x * taps` (true convolution), on the ANE.

    A finite impulse response is a convolution. The ANE conv is a CROSS-correlation
    (no kernel flip), so we flip the taps and zero-pad to realize a genuine
    convolution / causal FIR:

      * mode='full'  -> length L+K-1   (== np.convolve(x, taps))
      * mode='same'  -> length L, centered (== np.convolve(..., 'same'))
      * mode='valid' -> length L-K+1   (== np.convolve(..., 'valid'))
      * mode='lfilter' -> length L, causal (== scipy.signal.lfilter(taps, [1.0], x))

    Short kernels (K<=15) run on the native conv layer in ONE fused program; longer
    kernels fall back to FFT convolution (aneforge.fft) automatically - same result.

    cost: COMPUTE (a real conv) for short taps; FFT-domain for long taps.
    fit:  GOOD - feed-forward, no recurrence.
    """
    x = np.asarray(x, np.float32).ravel()
    taps = np.asarray(taps, np.float32).ravel()
    L, K = x.shape[0], taps.shape[0]
    if K > L:
        raise ValueError(f"fir_filter: {K} taps longer than signal length {L}")

    # long kernels: the native 1xK conv backend caps at K<=15 -> use FFT convolution.
    if K > _MAX_CONV_TAPS:
        full = fft_convolve(x, taps)                     # length L+K-1
        return _trim_conv(full, L, K, mode)

    # short kernels: native conv. ANE conv == correlation, so flip the taps and
    # left/right zero-pad to get the requested convolution slice.
    hflip = np.ascontiguousarray(taps[::-1]).astype(np.float16).reshape(1, 1, 1, K)
    if mode in ("full", "lfilter"):
        left = K - 1
    elif mode == "same":
        left = (K - 1) // 2
    elif mode == "valid":
        left = 0
    else:
        raise ValueError(f"fir_filter: mode must be full/same/valid/lfilter; got {mode!r}")
    if mode == "full":
        right = K - 1
    elif mode == "same":
        right = K - 1 - left
    elif mode == "lfilter":
        right = 0
    else:                                                # valid
        right = 0
    xp = np.concatenate([np.zeros(left, np.float32), x, np.zeros(right, np.float32)])
    Lp = xp.shape[0]
    inp = af.input((1, 1, 1, Lp))
    model = af.compile(af.conv(inp, hflip, pad=0))
    y = model(xp.astype(np.float16).reshape(1, 1, 1, Lp))
    return y.ravel().astype(np.float32)

fft_convolve

fft_convolve(x, h, block: int | None = None)

Linear convolution y = x * h via the FFT (aneforge.fft), length L+K-1.

Equivalent to np.convolve(x, h) but computed as FFT(x)*FFT(h) -> iFFT, so every stage is a matmul or an elementwise spectral product (ANE-native). For a short kernel against a long signal we use OVERLAP-ADD: split the signal into blocks, multiply each block-FFT by the (cached) kernel spectrum, inverse- transform, and sum the overlapping tails. A single block is used when the whole thing fits one transform.

cost: COMPUTE/FUSION - staged matmul-FFTs + spectral multiply. fit: GOOD.

Source code in aneforge/dsp.py
def fft_convolve(x, h, block: int | None = None):
    """Linear convolution `y = x * h` via the FFT (aneforge.fft), length L+K-1.

    Equivalent to `np.convolve(x, h)` but computed as FFT(x)*FFT(h) -> iFFT, so
    every stage is a matmul or an elementwise spectral product (ANE-native). For a
    short kernel against a long signal we use OVERLAP-ADD: split the signal into
    blocks, multiply each block-FFT by the (cached) kernel spectrum, inverse-
    transform, and sum the overlapping tails. A single block is used when the whole
    thing fits one transform.

    cost: COMPUTE/FUSION - staged matmul-FFTs + spectral multiply.
    fit:  GOOD.
    """
    x = np.asarray(x, np.float32).ravel()
    h = np.asarray(h, np.float32).ravel()
    L, K = x.shape[0], h.shape[0]
    out_len = L + K - 1

    # single-block path: one FFT covers the whole linear convolution.
    if block is None:
        single = _next_fft_size(out_len)
        if L <= 4 * K or single <= 1024:
            return _fft_conv_block(x, h, single)[:out_len].astype(np.float32)
        block = single  # (not reached for the common case; kept for explicitness)

    # overlap-add: choose an FFT size, step = N-K+1 samples per block.
    N = _next_fft_size(max(block, 2 * K))
    step = N - K + 1
    Hr, Hi = fft(np.pad(h, (0, N - K)), np.zeros(N, np.float32), N)
    fplan = fft_plan(N)
    iplan = ifft_plan(N)
    y = np.zeros(out_len, np.float32)
    for start in range(0, L, step):
        seg = x[start:start + step]
        seg = np.pad(seg, (0, N - seg.shape[0]))
        Xr, Xi = fplan(seg, np.zeros(N, np.float32))
        Yr = Xr * Hr - Xi * Hi
        Yi = Xr * Hi + Xi * Hr
        yr = _ifft_real_scaled(Yr, Yi, N, iplan)
        end = min(start + N, out_len)
        y[start:end] += yr[:end - start].astype(np.float32)
    return y

freq_filter

freq_filter(x, kind: str, cutoff, fs: float = 2.0)

Frequency-domain filter: FFT the (real) signal, zero out the rejected bins, inverse-FFT. A brick-wall ideal filter in the DFT domain.

kind in {'lowpass','highpass','bandpass','bandstop'}. cutoff is a scalar (low/high) or a (lo, hi) pair (band). fs is the sample rate (default 2.0 so a bare cutoff reads as a normalized frequency in [0,1] == fraction of Nyquist). Returns the real filtered signal (length L), on the ANE.

The mask is applied to BOTH the bin and its conjugate mirror so the output stays real. cost: COMPUTE/FUSION (FFT + elementwise mask + iFFT). fit: GOOD.

Source code in aneforge/dsp.py
def freq_filter(x, kind: str, cutoff, fs: float = 2.0):
    """Frequency-domain filter: FFT the (real) signal, zero out the rejected bins,
    inverse-FFT. A brick-wall ideal filter in the DFT domain.

    `kind` in {'lowpass','highpass','bandpass','bandstop'}. `cutoff` is a scalar
    (low/high) or a (lo, hi) pair (band). `fs` is the sample rate (default 2.0 so a
    bare `cutoff` reads as a normalized frequency in [0,1] == fraction of
    Nyquist). Returns the real filtered signal (length L), on the ANE.

    The mask is applied to BOTH the bin and its conjugate mirror so the output stays
    real. cost: COMPUTE/FUSION (FFT + elementwise mask + iFFT). fit: GOOD.
    """
    x = np.asarray(x, np.float32).ravel()
    L = x.shape[0]
    N = _next_fft_size(L)
    freqs = np.fft.fftfreq(N, d=1.0 / fs)                # bin center frequencies
    absf = np.abs(freqs)

    if kind in ("lowpass", "highpass"):
        fc = float(np.asarray(cutoff).ravel()[0])
        passband = (absf <= fc) if kind == "lowpass" else (absf >= fc)
    elif kind in ("bandpass", "bandstop"):
        lo, hi = (float(c) for c in np.asarray(cutoff).ravel()[:2])
        inband = (absf >= lo) & (absf <= hi)
        passband = inband if kind == "bandpass" else ~inband
    else:
        raise ValueError(f"freq_filter: kind must be lowpass/highpass/bandpass/bandstop; got {kind!r}")
    mask = passband.astype(np.float32)

    Xr, Xi = fft(np.pad(x, (0, N - L)), np.zeros(N, np.float32), N)
    Xr = Xr * mask
    Xi = Xi * mask
    yr = _ifft_real_scaled(Xr, Xi, N)                   # fp16-range-guarded iFFT
    return yr[:L].astype(np.float32)

stft

stft(x, win=256, hop=None, window: str = 'hann')

Short-time Fourier transform: window each frame, FFT it (aneforge.fft), stack.

win is the frame length (int) or an explicit window-coefficient array; hop the step between frames (default win//4 like scipy); window the named window when win is an int. Returns (Zr, Zi), each [n_freq, n_frames] with n_freq = win//2 + 1 (the non-redundant rfft half), matching scipy.signal.stft's bin layout (NOT its 1/sum(win) scaling - see spectrogram for magnitudes).

Each frame is one matmul-FFT. The frames are independent (no recurrence), a clean ANE fit; we batch them through the cached FFT plan. fit: GOOD.

Source code in aneforge/dsp.py
def stft(x, win=256, hop=None, window: str = "hann"):
    """Short-time Fourier transform: window each frame, FFT it (aneforge.fft), stack.

    `win` is the frame length (int) or an explicit window-coefficient array; `hop`
    the step between frames (default win//4 like scipy); `window` the named window
    when `win` is an int. Returns (Zr, Zi), each [n_freq, n_frames] with
    n_freq = win//2 + 1 (the non-redundant rfft half), matching scipy.signal.stft's
    bin layout (NOT its 1/sum(win) scaling - see spectrogram for magnitudes).

    Each frame is one matmul-FFT. The frames are independent (no recurrence), a
    clean ANE fit; we batch them through the cached FFT plan. fit: GOOD.
    """
    x = np.asarray(x, np.float32).ravel()
    if isinstance(win, (int, np.integer)):
        win_len = int(win)
        w = get_window(window, win_len)
    else:
        w = np.asarray(win, np.float32).ravel()
        win_len = w.shape[0]
    if hop is None:
        hop = win_len // 4
    N = _next_fft_size(win_len)
    n_freq = win_len // 2 + 1

    frames = _frame(x, win_len, hop) * w                 # [n_frames, win_len], windowed
    n_frames = frames.shape[0]
    plan = fft_plan(N)
    Zr = np.empty((n_freq, n_frames), np.float32)
    Zi = np.empty((n_freq, n_frames), np.float32)
    zero = np.zeros(N, np.float32)
    for t in range(n_frames):
        fr = np.pad(frames[t], (0, N - win_len))
        Fr, Fi = plan(fr, zero)
        Zr[:, t] = Fr[:n_freq]
        Zi[:, t] = Fi[:n_freq]
    return Zr, Zi

spectrogram

spectrogram(x, win=256, hop=None, window: str = 'hann', mode: str = 'magnitude')

Magnitude (or power) spectrogram = |STFT|. mode in {'magnitude','power'}. Returns a [n_freq, n_frames] real array. fit: GOOD.

Source code in aneforge/dsp.py
def spectrogram(x, win=256, hop=None, window: str = "hann", mode: str = "magnitude"):
    """Magnitude (or power) spectrogram = |STFT|. `mode` in {'magnitude','power'}.
    Returns a [n_freq, n_frames] real array. fit: GOOD."""
    Zr, Zi = stft(x, win=win, hop=hop, window=window)
    p = Zr.astype(np.float32) ** 2 + Zi.astype(np.float32) ** 2
    return p if mode == "power" else np.sqrt(p)

correlate

correlate(a, b, mode: str = 'valid')

Cross-correlation r[k] = sum_n a[n+k] * b[n] on the ANE.

For a short template (Lb<=15) uses the native CrossCorrelation bridge (a path Apple's MIL frontend rejects): a length-La row and a smaller length-Lb template -> 'valid' correlation of length La-Lb+1, matching np.correlate(a, b, 'valid'). A wider template (Lb>=16, the same 1xK conv-width arch wall as fir_filter) falls back to FFT correlation (correlation = convolution with b reversed). Other modes zero-pad the map before the valid correlation.

cost: MIXED (cut) for the bridge; COMPUTE/FFT for the fallback. fit: GOOD.

Source code in aneforge/dsp.py
def correlate(a, b, mode: str = "valid"):
    """Cross-correlation `r[k] = sum_n a[n+k] * b[n]` on the ANE.

    For a short template (Lb<=15) uses the native CrossCorrelation bridge (a path
    Apple's MIL frontend rejects): a length-La row and a smaller length-Lb template ->
    'valid' correlation of length La-Lb+1, matching `np.correlate(a, b, 'valid')`.
    A wider template (Lb>=16, the same 1xK conv-width arch wall as fir_filter) falls
    back to FFT correlation (correlation = convolution with `b` reversed). Other
    modes zero-pad the map before the valid correlation.

    cost: MIXED (cut) for the bridge; COMPUTE/FFT for the fallback. fit: GOOD.
    """
    a = np.asarray(a, np.float32).ravel()
    b = np.asarray(b, np.float32).ravel()
    La, Lb = a.shape[0], b.shape[0]
    if Lb >= La:
        raise ValueError(f"correlate: template length {Lb} must be < signal length {La} "
                         f"(CrossCorrelation bridge requires a strictly smaller template)")
    if mode == "valid":
        ap = a
    elif mode == "full":
        ap = np.concatenate([np.zeros(Lb - 1, np.float32), a, np.zeros(Lb - 1, np.float32)])
    elif mode == "same":
        pad = (Lb - 1) // 2
        ap = np.concatenate([np.zeros(pad, np.float32), a, np.zeros(Lb - 1 - pad, np.float32)])
    else:
        raise ValueError(f"correlate: mode must be valid/full/same; got {mode!r}")

    # wide template: the CrossCorrelation bridge lowers to a 1xLb conv, capped at
    # Lb<=15 on this ANE (Lb>=16 fails ANECCompile). Fall back to FFT correlation:
    # correlate(ap, b) 'valid' == full-convolve(ap, reverse(b)) sliced to valid lags.
    if Lb > _MAX_CONV_TAPS:
        full = fft_convolve(ap, b[::-1])                 # length len(ap)+Lb-1
        return full[Lb - 1:ap.shape[0]].astype(np.float32)

    Wp = ap.shape[0]
    model = af.compile(af.cross_correlation(af.input((1, Wp)), af.input((1, Lb))))
    y = model(ap.astype(np.float16).reshape(1, Wp), b.astype(np.float16).reshape(1, Lb))
    return y.ravel().astype(np.float32)

autocorrelate

autocorrelate(x, max_lag: int | None = None)

Autocorrelation r[k] = sum_n x[n] x[n+k] for lags k=0..max_lag, on the ANE.

Built as a correlation of the signal against its own leading window via the CrossCorrelation bridge. max_lag defaults to len(x)//4 (the template length is len(x)-max_lag, kept < len(x)). Returns r[0..max_lag] (the non-negative lags), matching the tail of np.correlate(x, x, 'full'). fit: GOOD.

Source code in aneforge/dsp.py
def autocorrelate(x, max_lag: int | None = None):
    """Autocorrelation `r[k] = sum_n x[n] x[n+k]` for lags k=0..max_lag, on the ANE.

    Built as a correlation of the signal against its own leading `window` via the
    CrossCorrelation bridge. `max_lag` defaults to len(x)//4 (the template length is
    len(x)-max_lag, kept < len(x)). Returns r[0..max_lag] (the non-negative lags),
    matching the tail of `np.correlate(x, x, 'full')`. fit: GOOD.
    """
    x = np.asarray(x, np.float32).ravel()
    L = x.shape[0]
    if max_lag is None:
        max_lag = L // 4
    tmpl_len = L - max_lag
    if tmpl_len < 1 or tmpl_len >= L:
        raise ValueError(f"autocorrelate: max_lag={max_lag} out of range for length {L}")
    tmpl = x[:tmpl_len]
    # valid correlation of x with its prefix gives r[k] = sum_n x[n+k] tmpl[n], k=0..max_lag
    return correlate(x, tmpl, mode="valid")

iir_filter

iir_filter(x, b, a, n_taps: int = 256)

ARCH-LIMITED IIR via a FIXED-LENGTH FIR unroll.

A direct-form IIR a[0] y[n] = sum_i b[i] x[n-i] - sum_{j>=1} a[j] y[n-j] is a DATA-DEPENDENT RECURRENCE over samples - each output depends on previous OUTPUTS. The ANE is feed-forward: no in-graph loop, no scalar feedback, no scan/cumsum. There is NO streaming IIR on this hardware.

The ONLY honest realization is to TRUNCATE the IIR's infinite impulse response to n_taps and run it as an FIR (via fir_filter / fft_convolve). Exact only up to the truncation tail, with the tap count fixed at build time - it does NOT scale to arbitrary streaming IIR. Use it for stable filters whose impulse response has decayed within n_taps samples; the residual is the truncated tail energy.

Returns the FIR-approximated, causal (lfilter-style) output (length L), plus prints nothing - the caller-facing tag is in the module docstring. fit: ARCH-LIMITED (fixed unroll only).

Source code in aneforge/dsp.py
def iir_filter(x, b, a, n_taps: int = 256):
    """ARCH-LIMITED IIR via a FIXED-LENGTH FIR unroll.

    A direct-form IIR `a[0] y[n] = sum_i b[i] x[n-i] - sum_{j>=1} a[j] y[n-j]` is a
    DATA-DEPENDENT RECURRENCE over samples - each output depends on previous OUTPUTS.
    The ANE is feed-forward: no in-graph loop, no scalar feedback, no scan/cumsum.
    There is NO streaming IIR on this hardware.

    The ONLY honest realization is to TRUNCATE the IIR's infinite impulse response to
    `n_taps` and run it as an FIR (via fir_filter / fft_convolve). Exact only up to
    the truncation tail, with the tap count fixed at build time - it does NOT scale
    to arbitrary streaming IIR. Use it for stable filters whose impulse response has
    decayed within `n_taps` samples; the residual is the truncated tail energy.

    Returns the FIR-approximated, causal (lfilter-style) output (length L), plus prints
    nothing - the caller-facing tag is in the module docstring. fit: ARCH-LIMITED
    (fixed unroll only).
    """
    import scipy.signal as ss
    b = np.asarray(b, np.float64).ravel()
    a = np.asarray(a, np.float64).ravel()
    # truncated impulse response: response of the IIR to a unit impulse, n_taps long.
    imp = np.zeros(n_taps, np.float64)
    imp[0] = 1.0
    ir = ss.lfilter(b, a, imp).astype(np.float32)        # FIR approximation of the IIR
    return fir_filter(x, ir, mode="lfilter")