Skip to content

Compile, optimize, estimate

Lowering a graph to one ANE program, the accuracy-preserving autotuner, and the measurement-free cost model. All are reached from the top level (af.compile, af.tune, af.estimate, ...).

aneforge - a clean graph->compile->run frontend for the Apple Neural Engine.

Build a small tensor graph, compile it into ONE fused e5rt program, and run it on the ANE. Fusing is the point: the ANE penalises many tiny dispatches, so a whole subgraph becomes a single program. Weights pack automatically into one BLOBFILE - fp16, or per-channel int8 streamed (dequantised during the tile DMA) when int8=True.

import aneforge as af

x = af.input((1, 3, 32, 32))
h = af.conv(x, W1, pad=1).relu()
h = af.conv(h, W2, pad=1).relu()
y = h.mean((2, 3)).reshape(1, C) @ Wfc
net = af.compile(y, int8=True)      # one fused ANE program
net = af.compile(y, compress="int4")   # 4-bit LUT weights, accuracy-gated
out = net(image)                    # run on the ANE
Op surface
  • linear algebra: conv, conv_transpose; matmul/linear via @; bmm
  • dynamic_conv: conv with a RUNTIME-tensor weight (hypernetworks / per-sample kernels; native ANE dynamic kernel, batch-1 only)
  • activations: relu/silu/gelu/sigmoid/tanh/exp/log/sqrt/rsqrt/abs/square/ sin/cos/erf/softplus/relu6/elu/leaky_relu/clip
  • arithmetic: add/sub/mul/div(/)/maximum/minimum/pow
  • reductions/norms: mean/sum/amax/amin, softmax, l2_norm, rms_norm/layer_norm/ group_norm/batch_norm
  • spatial/shape: max_pool/avg_pool, upsample, concat, reshape/transpose, pixel_shuffle/pixel_unshuffle
  • nn helpers: mha, cross_attention, geglu

Two op routes. Most ops are FUSED e5rt-MIL: they lower to MIL and fuse into ONE program (no graph cut). A second family are NETPLIST-BRIDGE ops - native Path-A hardware layers Apple's MIL frontend never emits (sdpa, argmax/topk/sort, cross_product/cross_correlation/cost_volume, fps/radius_search, minmax_norm/lrn, the space/channel/batch rearranges, flatten/input_view/dynamic_slice/ scaled_elementwise). Each bridge op CUTS the graph: surrounding regions run as e5rt programs, the bridge node runs as a separate native sub-program (sub-ms via the A2 persistent worker), and compile returns a SegmentedModel.

Image input: af.image_input(shape, scale=1/255, bias=0.0) declares a uint8 input port and dequantises it on the engine (cast -> scale -> bias), so raw camera / decoded-video bytes feed the model directly (host skips the float-convert/repack); scale/bias are scalar or per-channel (length-C, broadcast over NCHW).

Pretrained loaders: af.load(".../all-MiniLM-L6-v2") (sentence encoder), af.load_resnet18() (ImageNet classifier).

Design rules: compute is fp16 only (fp32/int32/bf16 rejected); reductions/matmuls use a WIDE (fp32-class) accumulator fed by radix-4 fp16-rounded input tiles - representable sums are near-exact (a sum/dot of 16384 ones is bit-exact, where naive fp16 would stall at ~2048), and a +1 survives next to a 16000 partial that an fp16 running sum would swallow. The fp16 limit is at the products and the I/O cast, not the running sum, so cancellation-heavy reductions still lose precision; int8=True streams weights at half the bytes. compress= chooses weight encoding: None (fp16, default), 'int8' (per-channel), 'int4' (LUT palettization, per-tensor, with an accuracy-gated fallback to int8/fp16 set by compress_atol), 'sparse' (unstructured bitmask, emitted when the weight is >=50% zeros, else fp16), or 'auto' (per-weight: sparse if sparse, else int4 if accurate, else int8, else fp16). int8=True is the alias for compress='int8'. Wraps the unentitled Espresso e5rt runtime only - no CoreML, no entitlement.

aneforge also has a tiny reverse-mode autograd (autograd.py): af.parameter / af.backward / af.mse / af.SGD / af.Trainer train a small model with the forward and backward passes compiled and run on the ANE. It also does classification: af.softmax_cross_entropy (analytic fp16-stable on-ANE gradient) + af.Adam train a 784->128->10 MLP on MNIST to ~97% test accuracy. Trainer(..., device_optimizer=True) additionally runs the OPTIMIZER STEP on the ANE (SGD/Adam update as graph ops), so all training tensor-math is on the engine; the host only computes the scalar lr_t and shuttles state/grads (the host<->device state round-trip remains). See examples/train_mnist_mlp.py.

Layout: graph.py (Tensor + ops), _compile.py (per-op emit registry + compile), _blob.py (weight packing), autograd.py (on-ANE autograd), models.py (pretrained loaders).

Model

A compiled, fused ANE program. Call it with the input array(s), in the order they were created with af.input.

Source code in aneforge/_compile.py
class Model:
    """A compiled, fused ANE program. Call it with the input array(s), in the order
    they were created with `af.input`."""

    def __init__(self, prog, inputs: list[tuple[str, tuple]], out_name: str, out_shape, n_ops: int,
                 input_tensors: list | None = None):
        self._prog = prog
        self._inputs = inputs
        # The ordered input Tensor objects (same order as `inputs`); lets callers
        # (e.g. autograd.Trainer) map each compiled input back to its source Tensor -
        # trainable parameter vs provided data. Additive; no effect on inference.
        self._input_tensors = input_tensors if input_tensors is not None else []
        self._out_name, self._out_shape = out_name, out_shape
        self.n_ops = n_ops  # graph ops fused into this single program

    def __call__(self, *arrays: np.ndarray) -> np.ndarray:
        if len(arrays) != len(self._inputs):
            raise ValueError(f"expected {len(self._inputs)} input(s), got {len(arrays)}")
        dts = getattr(self._prog, "_input_dtypes", {})
        feed = {}
        for (name, shape), a in zip(self._inputs, arrays):
            a = np.asarray(a)
            if tuple(a.shape) != tuple(shape):
                raise ValueError(f"input '{name}' shape {a.shape} != compiled {shape}")
            if dts.get(name, "fp16") == "uint8":
                if not np.issubdtype(a.dtype, np.integer):
                    raise TypeError(f"input '{name}' is a uint8 image port; pass an integer/uint8 "
                                    f"array (got dtype {a.dtype})")
                feed[name] = a.astype(np.uint8)              # Program._feed sends the raw bytes
            else:
                feed[name] = a.astype(np.float16)
        return self._prog.eval(feed)[self._out_name].astype(np.float32)

    # ---- zero-copy hot-loop API (skip the per-call host<->device memcpy) ----
    # Pattern:  v = model.input_view(); ... write fp16 into v ...; model.execute();
    #           out = model.output_view()   # fp16 view onto the result, no copy
    # For tight inference/training loops; for one-off calls just use __call__.
    def input_view(self, name: str | None = None) -> np.ndarray:
        """Writable fp16 view onto an input buffer (defaults to the sole input port).
        See `Program.input_view`."""
        return self._prog.input_view(name or self._inputs[0][0])

    def output_view(self) -> np.ndarray:
        """fp16 view onto the output buffer, valid after `execute()`. See
        `Program.output_view`."""
        return self._prog.output_view(self._out_name)

    def execute(self) -> None:
        """Run once without binding inputs / reading outputs - pair with
        `input_view`/`output_view` for a zero-copy loop."""
        self._prog.execute()

    def release(self) -> None:
        self._prog.release()

input_view

input_view(name: str | None = None) -> np.ndarray

Writable fp16 view onto an input buffer (defaults to the sole input port). See Program.input_view.

Source code in aneforge/_compile.py
def input_view(self, name: str | None = None) -> np.ndarray:
    """Writable fp16 view onto an input buffer (defaults to the sole input port).
    See `Program.input_view`."""
    return self._prog.input_view(name or self._inputs[0][0])

output_view

output_view() -> np.ndarray

fp16 view onto the output buffer, valid after execute(). See Program.output_view.

Source code in aneforge/_compile.py
def output_view(self) -> np.ndarray:
    """fp16 view onto the output buffer, valid after `execute()`. See
    `Program.output_view`."""
    return self._prog.output_view(self._out_name)

execute

execute() -> None

Run once without binding inputs / reading outputs - pair with input_view/output_view for a zero-copy loop.

Source code in aneforge/_compile.py
def execute(self) -> None:
    """Run once without binding inputs / reading outputs - pair with
    `input_view`/`output_view` for a zero-copy loop."""
    self._prog.execute()

SegmentedModel

A compiled plan: e5rt program segments interleaved with native-ANE sub-program calls (sdpa / argmax / topk - see NETPLIST_OPS). Tensors thread between segments as host fp16 arrays (correctness-first; a persistent IOSurface worker is the throughput follow-up).

Source code in aneforge/_compile.py
class SegmentedModel:
    """A compiled plan: e5rt program segments interleaved with native-ANE
    sub-program calls (sdpa / argmax / topk - see NETPLIST_OPS). Tensors thread
    between segments as host fp16 arrays (correctness-first; a persistent IOSurface
    worker is the throughput follow-up)."""

    def __init__(self, stages, inputs, out_id, out_shape, n_ops, n_netplist):
        self._stages = stages
        self._inputs = inputs  # list of (id, name, shape) in creation order
        self._out_id, self._out_shape = out_id, out_shape
        self.n_ops = n_ops
        # count of native sub-programs; kept as `n_sdpa` for backward compat
        self.n_netplist = self.n_sdpa = n_netplist
        # A2: persistent Path-A workers, one per netplist stage with a worker
        # route. Built lazily on first call (keyed by the stage), reused for the
        # rest of this model's lifetime, released in release(). Set
        # ANEFORGE_NETPLIST_WORKER=0 to force the A1 (subprocess-per-call) path.
        self._workers: dict[int, object] = {}
        self._worker_runs: dict[int, object] = {}
        self._worker_warned: set[str] = set()

    def __call__(self, *arrays: np.ndarray) -> np.ndarray:
        if len(arrays) != len(self._inputs):
            raise ValueError(f"expected {len(self._inputs)} input(s), got {len(arrays)}")
        env = {}
        for (iid, name, shape), a in zip(self._inputs, arrays):
            a = np.asarray(a)
            if tuple(a.shape) != tuple(shape):
                raise ValueError(f"input '{name}' shape {a.shape} != compiled {shape}")
            env[iid] = a.astype(np.float16)
        for st in self._stages:
            if st["kind"] == "region":
                feed = {name: env[sid] for sid, name in st["srcs"]}
                env[st["tid"]] = st["prog"].eval(feed)[st["out_var"]].astype(np.float16)
            else:  # native netplist-bridge sub-program (sdpa / argmax / topk / ...)
                runner = self._netplist_runner(st)
                src_arrays = [env[i] for i in st["src_ids"]]
                env[st["tid"]] = np.asarray(runner(src_arrays, st["attrs"]), dtype=np.float16)
        return env[self._out_id].astype(np.float32)

    def _netplist_runner(self, st):
        """Resolve the runner for a netplist stage. Prefer a persistent Path-A worker
        (load-once-eval-many), built lazily on first use and cached for this model's
        lifetime. Fall back to the subprocess-per-call bridge when no worker route
        exists for the op or when the worker is disabled / fails to start."""
        import os
        if os.environ.get("ANEFORGE_NETPLIST_WORKER", "1") == "0":
            return NETPLIST_OPS[st["op"]][0]
        # Causal SDPA needs the per-call additive-mask injection, which lives on the
        # subprocess bridge path (_run_sdpa); the persistent Path-A worker pre-builds a
        # mask-less netplist, so route masked SDPA to the bridge (correctness over the worker).
        if st["op"] == "sdpa" and (st["attrs"].get("causal") or st["attrs"].get("masked")):
            return NETPLIST_OPS[st["op"]][0]
        sid = st["tid"]
        run = self._worker_runs.get(sid)
        if run is not None:
            return run
        try:
            from . import _netplist_worker as nw
            if not nw.has_worker(st["op"]):
                # No worker route exists for this op - the subprocess bridge IS the
                # normal path, not a degradation; stay silent.
                run = NETPLIST_OPS[st["op"]][0]
                self._worker_runs[sid] = run
                return run
            worker, run = nw.build_worker(st["op"], st["src_shapes"][0], st["attrs"])
            self._workers[sid] = worker
            self._worker_runs[sid] = run
            return run
        except Exception as e:
            # Genuine worker failure -> fall back to the verified subprocess-bridge path
            # (correctness-preserving, slower per call) and remember it so we don't
            # retry the worker every call. Signal once per op.
            if st["op"] not in self._worker_warned:
                self._worker_warned.add(st["op"])
                import warnings
                warnings.warn(
                    f"aneforge: persistent worker for {st['op']!r} unavailable ({e!r}); "
                    f"falling back to the slower per-call subprocess bridge. Set "
                    f"ANEFORGE_NETPLIST_WORKER=0 to force (and silence) this path.",
                    stacklevel=2)
            run = NETPLIST_OPS[st["op"]][0]
            self._worker_runs[sid] = run
            return run

    def release(self) -> None:
        for st in self._stages:
            if st["kind"] == "region":
                st["prog"].release()
        for w in self._workers.values():
            try:
                w.release()
            except Exception:
                pass
        self._workers.clear()
        self._worker_runs.clear()

PrecisionWarning

Bases: UserWarning

Emitted by compile when a graph has fp16-cancellation-risk nodes whose results may be inaccurate. Silence with warnings.filterwarnings('ignore', category=aneforge.PrecisionWarning).

Source code in aneforge/_compile.py
class PrecisionWarning(UserWarning):
    """Emitted by `compile` when a graph has fp16-cancellation-risk nodes whose
    results may be inaccurate. Silence with
    `warnings.filterwarnings('ignore', category=aneforge.PrecisionWarning)`."""

CrossChipFP16Warning

Bases: UserWarning

Emitted by cross_compile_check when a graph compiles for a different-family target but carries an op whose fp16 VALUE can diverge from the host's on that chip (per the Direction B HAL-field predictor). The compile is still valid - this is a numeric heads-up, not a rejection. Silence with warnings.filterwarnings('ignore', category=aneforge.CrossChipFP16Warning).

Source code in aneforge/_compile.py
class CrossChipFP16Warning(UserWarning):
    """Emitted by `cross_compile_check` when a graph compiles for a different-family
    target but carries an op whose fp16 VALUE can diverge from the host's on that chip
    (per the Direction B HAL-field predictor). The compile is still valid - this is a
    numeric heads-up, not a rejection. Silence with
    `warnings.filterwarnings('ignore', category=aneforge.CrossChipFP16Warning)`."""

DispatchFloorWarning

Bases: UserWarning

Emitted by compile when a program is dispatch-floor-bound: its predicted ANE time is dominated by the fixed per-call dispatch + firmware round-trip, so each call costs about the same however small the work is. Dispatch is single-in-flight, so threads/concurrency do not amortize it - only larger batches or more ops per program do. Silence with warnings.filterwarnings('ignore', category=aneforge.DispatchFloorWarning).

Source code in aneforge/_compile.py
class DispatchFloorWarning(UserWarning):
    """Emitted by `compile` when a program is dispatch-floor-bound: its predicted ANE
    time is dominated by the fixed per-call dispatch + firmware round-trip, so each call
    costs about the same however small the work is. Dispatch is single-in-flight, so
    threads/concurrency do not amortize it - only larger batches or more ops per program do.
    Silence with `warnings.filterwarnings('ignore', category=aneforge.DispatchFloorWarning)`."""

CompileBackoffError

Bases: RuntimeError

Raised (in strict mode) when a compile is attempted within the backoff window after a recent compile failure.

Source code in aneforge/_circuit.py
class CompileBackoffError(RuntimeError):
    """Raised (in strict mode) when a compile is attempted within the backoff window
    after a recent compile failure."""

compile

compile(out: Tensor, int8: bool = False, build_dir=None, opt='routes', compress: str | None = None, compress_atol: float = 0.05, block_size: int = 32, validate: bool = False, target=None, _check_precision: bool = True)

Lower the graph rooted at out into ONE fused ANE program (or, if it contains af.sdpa nodes, a segmented plan: e5rt programs split around each native-SDPA sub-program).

opt selects the graph optimizer (default 'routes'): - opt='routes' (default) : the lossless route pass - cost-model-driven, per shape, choosing for each route-bearing bridge node (sdpa, minmax_norm, flatten, lrn) between the native bridge (a graph cut) and the proven-equivalent fused decomposition (cut removed), without on-device measurement. The route registry is lossless (cos 1.0), so this never changes numerics; a cut-free graph compiles to exactly the opt=0 program (no regression). For sdpa the cost model keeps native where it wins (long sequences), so the default never blindly removes a cut that helps. - opt=0 : no optimization - the historical, byte-identical path (int8 honored as given). Use this for byte-identity tests. - opt=1 : cost-model pick over the full variant set (route swaps and the lossy whole-graph int8 variant), without measuring on-device. - opt=2 / opt='max' : autotune - measure the legal proven-safe variants on the ANE, validate each vs the opt=0 baseline, return the fastest correct one (cached; instant on a cache hit).

compress selects weight encoding: None (fp16, default, byte-identical at opt=0), 'int8' (per-channel), 'int4' (LUT, accuracy-gated), 'sparse' (bitmask, when the weight is >=50% zeros), 'blockwise' (per-inner-block int8 via constexpr_blockwise_shift_scale, block_size columns per scale, accuracy-gated -> int8 -> fp16), or 'auto' (per-weight: sparse if sparse, else int4 if accurate, else int8, else fp16 - the most aggressive encoding that stays correct). 'auto' is family-aware: only encodings that stream natively on the target family (host-detected when target=None) are candidates - on h13/M1 that is int4-LUT + sparse, so auto streams those (sparse for >=50%-zero weights) but skips int8/blockwise (they fold to dense fp16: accuracy cost, no bandwidth win) and a rejected int4 falls to fp16. Explicit single-mode knobs are never filtered. compress_atol is the int4/blockwise fallback budget (relative L2); block_size is the inner-dim block width for 'blockwise'. Compressed weights stay on the byte-identical (opt=0) lowering: passing compress with an explicit opt>=1 is rejected, and the default route pass is skipped for compressed compiles (they take the no-op path).

Source code in aneforge/_compile.py
def compile(out: Tensor, int8: bool = False, build_dir=None, opt="routes",
            compress: str | None = None, compress_atol: float = 0.05,
            block_size: int = 32, validate: bool = False, target=None,
            _check_precision: bool = True):
    """Lower the graph rooted at `out` into ONE fused ANE program (or, if it
    contains `af.sdpa` nodes, a segmented plan: e5rt programs split around each
    native-SDPA sub-program).

    `opt` selects the graph optimizer (default `'routes'`):
      - `opt='routes'` (default) : the lossless route pass - cost-model-driven, per
                      shape, choosing for each route-bearing bridge node (sdpa,
                      minmax_norm, flatten, lrn) between the native bridge (a graph
                      cut) and the proven-equivalent fused decomposition (cut removed),
                      without on-device measurement. The route registry is lossless
                      (cos 1.0), so this never changes numerics; a cut-free graph
                      compiles to exactly the `opt=0` program (no regression). For
                      `sdpa` the cost model keeps native where it wins (long
                      sequences), so the default never blindly removes a cut that
                      helps.
      - `opt=0`   : no optimization - the historical, byte-identical path
                      (`int8` honored as given). Use this for byte-identity tests.
      - `opt=1`   : cost-model pick over the full variant set (route swaps and the
                      lossy whole-graph int8 variant), without measuring on-device.
      - `opt=2` / `opt='max'` : autotune - measure the legal proven-safe variants
                      on the ANE, validate each vs the opt=0 baseline, return the
                      fastest correct one (cached; instant on a cache hit).

    `compress` selects weight encoding: None (fp16, default, byte-identical at
    `opt=0`), 'int8' (per-channel), 'int4' (LUT, accuracy-gated), 'sparse' (bitmask,
    when the weight is >=50% zeros), 'blockwise' (per-inner-block int8 via
    `constexpr_blockwise_shift_scale`, `block_size` columns per scale,
    accuracy-gated -> int8 -> fp16), or 'auto' (per-weight: sparse if sparse, else int4
    if accurate, else int8, else fp16 - the most aggressive encoding that stays
    correct). 'auto' is family-aware: only encodings that stream natively on the
    `target` family (host-detected when `target=None`) are candidates - on h13/M1
    that is int4-LUT + sparse, so auto streams those (sparse for >=50%-zero weights) but
    skips int8/blockwise (they fold to dense fp16: accuracy cost, no bandwidth win) and a
    rejected int4 falls to fp16. Explicit single-mode knobs are never filtered.
    `compress_atol` is the int4/blockwise fallback budget (relative L2);
    `block_size` is the inner-dim block width for 'blockwise'. Compressed weights
    stay on the byte-identical (opt=0) lowering: passing `compress` with an explicit
    `opt>=1` is rejected, and the default route pass is skipped for compressed
    compiles (they take the no-op path).
    """
    if _check_precision:                 # once per user compile (internal re-entries pass False)
        _precision_signal(out, strict=validate)
        _dispatch_floor_signal(out)
        out = _retarget_for(out, target)  # gate ops/shapes for the target ANE family
    _OPT0 = (0, None, False)
    family = None
    if compress is not None:
        if opt not in _OPT0 and opt != "routes":
            raise NotImplementedError("compress= is not yet supported with opt>=1; "
                                      "use opt=0 for compressed weights")
        opt = 0          # compressed weights always take the byte-identical lowering
        if compress == "auto":
            family = _resolve_family(target)   # auto is family-aware: stream-only candidates
    if opt == "routes":
        from . import _optimize
        return _optimize._compile_routes(out, int8=int8, build_dir=build_dir)
    if opt not in _OPT0:
        return _compile_opt(out, int8=int8, opt=opt)
    order = _topo(out)
    if any(t.op in NETPLIST_OPS for t in order):
        return _compile_segmented(out, int8, build_dir, compress, compress_atol, block_size,
                                  family=family)
    bad = sorted({t.op for t in order if t.op != "input" and t.op not in _EMIT})
    if bad:
        raise NotImplementedError(f"aneforge: ops not reachable on the ANE: {bad}")
    inputs = sorted((t for t in order if t.op == "input"), key=lambda t: t.attrs.get("idx", 0))
    if not inputs:
        raise ValueError("aneforge.compile: graph has no inputs")
    for i, t in enumerate(order):
        t._name = f"t{i}"

    em = _Emitter(int8, compress=compress, compress_atol=compress_atol, block_size=block_size,
                  family=family)
    for t in order:
        if t.op != "input":
            _EMIT[t.op](em, t, t._name, [src._name for src in t.srcs])

    prog = _assemble_and_compile(em, inputs, out._name, out.shape, build_dir)
    n_ops = sum(1 for t in order if t.op != "input")
    return Model(prog, [(t._name, t.shape) for t in inputs], out._name, out.shape, n_ops,
                 input_tensors=list(inputs))

tune

tune(out, budget: int = 8, inputs=None, prune_factor: float = 1.5, reps: int = 20, atol: float = _ACCURACY_TOL, min_lossy_speedup: float = _MIN_LOSSY_SPEEDUP, verbose: bool = False, target_error: float | None = None)

Return the fastest CORRECT compiled Model for the graph rooted at out.

Enumerates the legal (proven-safe) variant space, prunes with the cost model, measures the survivors on the ANE, validates each against the opt=0 baseline, picks the fastest correct one, and caches the decision (instant on a cache hit).

PRECISION axis: pass target_error=E to switch to the precision-aware path (tune_precision) - the optimizer then selects the numerics-aware rewrite set (reduce_sum->matmul, +/- int8) that meets the error budget E (measured vs an fp32 reference, or vs the fp16 baseline's output when no fp32 emulation exists for the graph) at minimum cost, and can IMPROVE accuracy over the fp16 baseline. With no target_error the behavior is unchanged (speed tune; accuracy-preserving).

Accuracy contract: by default (atol = fp16 noise) tune is accuracy-PRESERVING - a lossy rewrite (int8) is rejected, so the result matches opt=0 within fp16 noise. To trade accuracy for speed, pass an explicit budget, e.g. tune(out, atol=0.1); even then a lossy variant must beat the baseline by min_lossy_speedup (default 1.10) to be chosen, so a measurement-noise "win" never costs accuracy for no real gain.

budget caps the number of on-device measurements. prune_factor: skip any variant whose estimate() is > prune_factor x the best estimate so far.

Source code in aneforge/_optimize.py
def tune(out, budget: int = 8, inputs=None, prune_factor: float = 1.5,
         reps: int = 20, atol: float = _ACCURACY_TOL,
         min_lossy_speedup: float = _MIN_LOSSY_SPEEDUP, verbose: bool = False,
         target_error: float | None = None):
    """Return the fastest CORRECT compiled Model for the graph rooted at `out`.

    Enumerates the legal (proven-safe) variant space, prunes with the cost model, measures
    the survivors on the ANE, validates each against the opt=0 baseline, picks the fastest
    correct one, and caches the decision (instant on a cache hit).

    PRECISION axis: pass `target_error=E` to switch to the precision-aware path
    (`tune_precision`) - the optimizer then selects the numerics-aware rewrite set
    (reduce_sum->matmul, +/- int8) that meets the error budget E (measured vs an fp32
    reference, or vs the fp16 baseline's output when no fp32 emulation exists for the
    graph) at minimum cost, and can IMPROVE accuracy over the fp16 baseline. With no
    `target_error` the behavior is unchanged (speed tune; accuracy-preserving).

    Accuracy contract: by default (`atol` = fp16 noise) tune is accuracy-PRESERVING - a
    lossy rewrite (int8) is rejected, so the result matches opt=0 within fp16 noise. To
    trade accuracy for speed, pass an explicit budget, e.g. `tune(out, atol=0.1)`; even
    then a lossy variant must beat the baseline by `min_lossy_speedup` (default 1.10) to
    be chosen, so a measurement-noise "win" never costs accuracy for no real gain.

    `budget` caps the number of on-device measurements. `prune_factor`: skip any variant
    whose estimate() is > prune_factor x the best estimate so far.
    """
    if target_error is not None:
        model, _report = tune_precision(out, target_error=target_error, inputs=inputs,
                                        reps=reps, verbose=verbose)
        return model

    input_shapes = _input_shapes(out)
    key = _graph_key(out, input_shapes)
    cache = _load_cache()

    configs = _variants(out)

    # cache hit: rebuild the cached winner directly (no measurement). The winner is
    # either an enumerated variant (route/global-int8) or a greedy per-weight int8 config
    # (carrying `int8_nodes`, not in the static variant set but still buildable from the
    # same graph since the cache key pins the structure).
    if key in cache and cache[key].get("config") is not None:
        cfg = cache[key]["config"]
        if cfg in configs or cfg.get("int8_nodes"):
            if verbose:
                print(f"[tune] cache hit {key}: {_config_label(cfg)} "
                      f"({cache[key].get('us', '?')} us)")
            return build_variant(out, cfg)

    if inputs is None:
        inputs = _gen_inputs(input_shapes)

    # rank by cost-model estimate; the lossless baseline first so it is the reference.
    ranked = sorted(configs, key=lambda c: _estimate_variant(out, c))
    # ensure a lossless variant is measured first (it is the correctness reference).
    ranked = ([c for c in ranked if not c.get("lossy")] +
              [c for c in ranked if c.get("lossy")])

    best_cfg, best_us, baseline_out = None, float("inf"), None
    baseline_us = float("inf")     # the lossless fp16 baseline latency (the reference)
    best_est = min(_estimate_variant(out, c) for c in configs)
    n_measured = 0
    results = []
    skipped_lossy_no_baseline = False

    for cfg in ranked:
        if n_measured >= budget:
            break
        est = _estimate_variant(out, cfg)
        # `ranked` measures every lossless variant before any lossy one, so reaching a
        # lossy variant with no baseline means the fp16 baseline failed to compile. A lossy
        # variant must never become its own accuracy reference - skip it.
        if cfg.get("lossy") and baseline_out is None:
            results.append((cfg, est, None, "skipped"))
            skipped_lossy_no_baseline = True
            if verbose:
                print(f"[tune] skip {_config_label(cfg)}: no lossless baseline to "
                      f"validate against")
            continue
        # prune: skip lossy variants the model predicts far worse than the best estimate.
        # Never prune a lossless variant (route swaps are free accuracy-wise and are the
        # correctness reference / safe fallback).
        if cfg.get("lossy") and est > prune_factor * best_est and best_cfg is not None:
            results.append((cfg, est, None, "pruned"))
            if verbose:
                print(f"[tune] prune {_config_label(cfg)}: est {est:.0f}us > "
                      f"{prune_factor}x best est {best_est:.0f}us")
            continue
        us, out_arr = measure(out, inputs, cfg, baseline_out=baseline_out, reps=reps, tol=atol)
        n_measured += 1
        if baseline_out is None and out_arr is not None:
            baseline_out = out_arr     # first successful = reference (the fp16 baseline)
            baseline_us = us
        results.append((cfg, est, us, "measured"))
        if verbose:
            print(f"[tune] {_config_label(cfg):28s} est {est:7.0f}us  meas "
                  f"{us if us != float('inf') else 'INCORRECT/FAIL'} us")
        if us < best_us:
            # a lossy variant (int8) must beat the lossless baseline by a real margin -
            # never swap accuracy for a measurement-noise "win". A lossless route swap is
            # chosen on raw speed (no margin needed; it's bit-identical).
            if cfg.get("lossy") and not (baseline_us == float("inf")
                                         or us * min_lossy_speedup <= baseline_us):
                continue
            best_us, best_cfg = us, cfg

    if skipped_lossy_no_baseline:
        warnings.warn(
            "tune(): the fp16 baseline failed to compile (no lossless variant measured "
            "successfully), so lossy variants were skipped - without a lossless "
            "reference their accuracy cannot be validated.")

    # greedy per-weight int8 (coordinate descent). int8 is lossy, so only enumerate it
    # when the user has opted into a loose accuracy budget (atol > the fp16-noise default).
    # At the tight default it fails the accuracy gate anyway - skip it so the budget is not
    # wasted and default tune stays the lossless baseline (byte-identical decision). The
    # result competes with GLOBAL int8 already in the variant set: greedy wins when only
    # SOME weights tolerate int8, global when all do (greedy then selects every candidate
    # and ties global).
    if (atol > _ACCURACY_TOL and _has_weights(out) and baseline_out is not None
            and n_measured < budget):
        i8_nodes, i8_us, i8_n = _greedy_int8(
            out, inputs, baseline_out, baseline_us, reps=reps, atol=atol,
            min_lossy_speedup=min_lossy_speedup, budget=budget - n_measured,
            verbose=verbose)
        n_measured += i8_n
        if i8_nodes and i8_us < best_us:
            best_us = i8_us
            best_cfg = {"int8": False, "decomp": [], "int8_nodes": list(i8_nodes),
                        "lossy": True}
            if verbose:
                print(f"[tune] per-weight int8 wins: nodes {list(i8_nodes)} "
                      f"({i8_us:.0f}us vs baseline {baseline_us:.0f}us)")

    if best_cfg is None:
        best_cfg = {"int8": False, "decomp": (), "lossy": False}  # safe fallback

    cache[key] = {"config": best_cfg, "us": (None if best_us == float("inf") else round(best_us, 1)),
                  "shapes": [list(s) for s in input_shapes]}
    _save_cache(cache)

    if verbose:
        print(f"[tune] winner: {_config_label(best_cfg)} "
              f"({best_us if best_us != float('inf') else '?'} us); cached {key}")
    return build_variant(out, best_cfg)

tune_precision

tune_precision(out, target_error: float | None = None, cost_budget_us: float | None = None, inputs=None, reps: int = 20, verbose: bool = False)

PRECISION-AWARE tune: select the numerics-aware rewrite set under an explicit ERROR BUDGET (or a cost budget), measuring accuracy vs an fp32 reference.

Two modes (give one): * target_error=E : among variants whose measured relerr-vs-reference <= E, pick the one with MINIMUM predicted cost. (A tight E forces the accurate rewrites; a loose E lets a cheaper/ lossy variant in.) * cost_budget_us=C : among variants whose predicted cost <= C, pick the one with MINIMUM measured error. (Best accuracy you can buy.)

Default (neither given) == minimize error at any cost (the most-accurate variant).

Returns (model, report) where report carries the per-variant (cost, error) table, the precision-risk flags, and the chosen config - so a caller can SEE the accuracy/cost frontier, not just the winner. Unlike speed-tune(), a variant here can be chosen for IMPROVING accuracy over the fp16 baseline.

HONEST SCOPE: automatic hotspot detection covers the reduce_sum->matmul case (structurally detectable). The CFG-style paired-fp16 fix is opt-in (use precision_rewrite / pass a marked region) because near-equal cancellation is data-dependent and cannot be confirmed at graph-build time.

REFERENCE: _fp32_reference emulates only the simple elementwise/matmul ops; a graph it cannot evaluate (conv/softmax/norms/...) falls back to the fp16 baseline's own measured output as the reference - target_error then bounds divergence FROM the fp16 baseline (which has relerr 0.0 by definition). The report's ref_kind ("fp32" | "fp16-baseline" | None) records which reference was used; None means no reference of any kind existed (the baseline failed too), in which case the budget is NOT enforced and a warning is emitted.

Source code in aneforge/_optimize.py
def tune_precision(out, target_error: float | None = None, cost_budget_us: float | None = None,
                   inputs=None, reps: int = 20, verbose: bool = False):
    """PRECISION-AWARE tune: select the numerics-aware rewrite set under an explicit
    ERROR BUDGET (or a cost budget), measuring accuracy vs an fp32 reference.

    Two modes (give one):
      * `target_error=E`    : among variants whose measured relerr-vs-reference <= E,
                                pick the one with MINIMUM predicted cost. (A tight E
                                forces the accurate rewrites; a loose E lets a cheaper/
                                lossy variant in.)
      * `cost_budget_us=C`  : among variants whose predicted cost <= C, pick the one
                                with MINIMUM measured error. (Best accuracy you can buy.)

    Default (neither given) == minimize error at any cost (the most-accurate variant).

    Returns `(model, report)` where report carries the per-variant (cost, error) table,
    the precision-risk flags, and the chosen config - so a caller can SEE the accuracy/cost
    frontier, not just the winner. Unlike speed-tune(), a variant here can be chosen for
    IMPROVING accuracy over the fp16 baseline.

    HONEST SCOPE: automatic hotspot detection covers the reduce_sum->matmul case
    (structurally detectable). The CFG-style paired-fp16 fix is opt-in (use
    `precision_rewrite` / pass a marked region) because near-equal cancellation is
    data-dependent and cannot be confirmed at graph-build time.

    REFERENCE: `_fp32_reference` emulates only the simple elementwise/matmul ops; a graph
    it cannot evaluate (conv/softmax/norms/...) falls back to the fp16 baseline's own
    measured output as the reference - `target_error` then bounds divergence FROM the fp16
    baseline (which has relerr 0.0 by definition). The report's `ref_kind` ("fp32" |
    "fp16-baseline" | None) records which reference was used; None means no reference of
    any kind existed (the baseline failed too), in which case the budget is NOT enforced
    and a warning is emitted."""
    input_shapes = _input_shapes(out)
    if inputs is None:
        inputs = _gen_inputs(input_shapes)
    ref = _fp32_reference(out, inputs)
    ref_kind = "fp32" if ref is not None else None

    configs, risk = _precision_variants(out)
    rows = []
    for cfg in configs:
        est = _estimate_variant(out, cfg)
        us, relerr, out_arr = _measure_with_ref(out, inputs, cfg, ref, reps=reps)
        if ref_kind is None and cfg["label_kind"] == "fp16-baseline" and out_arr is not None:
            # no fp32-faithful emulation for this graph: the fp16 baseline's own measured
            # output becomes the reference (the docstring contract), so remaining variants
            # are gated on divergence from it. The baseline is the reference: relerr 0.0 by
            # definition, not NaN.
            ref = np.asarray(out_arr, np.float64)
            ref_kind = "fp16-baseline"
            relerr = 0.0
        rows.append({"config": cfg, "label": cfg["label_kind"], "est_us": est,
                     "meas_us": us, "relerr": relerr, "ok": us != float("inf")})
        if verbose:
            print(f"[tune_precision] {cfg['label_kind']:22s} est {est:7.0f}us  "
                  f"meas {us if us != float('inf') else 'FAIL':>9} "
                  f"relerr {relerr:.3e}")

    usable = [r for r in rows if r["ok"]]
    if not usable:
        # nothing compiled - fall back to the plain fp16 baseline.
        model = build_variant(out, {"rs_matmul": [], "int8": False})
        return model, {"rows": rows, "risk": risk, "chosen": None,
                       "ref_available": ref is not None, "ref_kind": ref_kind}

    if ref_kind is None:
        # NO reference of any kind: fp32 emulation is unsupported for this graph AND the
        # fp16 baseline failed, so no variant's error was measured. Error-based selection
        # would be meaningless - prefer the cheapest LOSSLESS variant and say so rather than
        # silently pretending the budget was enforced.
        pool = [r for r in usable if not r["config"].get("lossy")] or usable
        chosen = min(pool, key=lambda r: r["est_us"])
        reason = ("NO accuracy reference (fp32 emulation unsupported for this graph; "
                  "the fp16 baseline failed to run) - error budget NOT enforced; "
                  "chose min-cost" + ("" if pool is usable else " lossless"))
        warnings.warn(f"tune_precision: {reason}")
    elif target_error is not None:
        meeting = [r for r in usable if r["relerr"] <= target_error]
        if meeting:
            chosen = min(meeting, key=lambda r: r["est_us"])   # cheapest that meets E
            reason = (f"min-cost meeting target_error={target_error:.1e} "
                      f"(error vs {ref_kind} reference)")
        else:
            chosen = min(usable, key=lambda r: r["relerr"])    # none meet -> most accurate
            reason = (f"NO variant met target_error={target_error:.1e} vs the "
                      f"{ref_kind} reference; chose most-accurate")
    elif cost_budget_us is not None:
        affordable = [r for r in usable if r["est_us"] <= cost_budget_us]
        pool = affordable or usable
        chosen = min(pool, key=lambda r: r["relerr"])
        reason = (f"min-error vs {ref_kind} reference within cost_budget={cost_budget_us:.0f}us"
                  if affordable else
                  f"NO variant under cost_budget={cost_budget_us:.0f}us; chose most-accurate")
    else:
        chosen = min(usable, key=lambda r: r["relerr"])
        reason = f"min-error vs {ref_kind} reference (no budget given)"

    if verbose:
        print(f"[tune_precision] CHOSE {chosen['label']} "
              f"(relerr {chosen['relerr']:.3e}, est {chosen['est_us']:.0f}us) - {reason}")
    model = build_variant(out, chosen["config"])
    return model, {"rows": rows, "risk": risk, "chosen": chosen, "reason": reason,
                   "ref_available": ref is not None, "ref_kind": ref_kind}

estimate

estimate(out, int8: bool = False, target: str | None = None) -> float

Estimate the compiled latency (microseconds) of the graph rooted at out.

int8 scales streamed weight bytes by ~0.5 (per-channel int8 streams half the bytes; activations stay fp16). This is the only dtype lever the model needs to rank int8 vs fp16; everything else is structural.

target (an ANE arch string, e.g. 'h13'/'h17s') switches to the measurement-free ANALYTIC per-chip model (Direction A) - a roofline taken from the nearest silicon-measured anchor (M1/h13 or M5/h17s) and scaled to that chip's {cores, clock, efficiency} curve, valid for all 28 chips with no on-device measurement (+/-17% on the measured M1 convs; the M5 anchor lands the loop-closure convs within ~15% on the quoted set). target=None (the default) uses the precise M5-measured heuristic below, unchanged.

Source code in aneforge/_cost.py
def estimate(out, int8: bool = False, target: str | None = None) -> float:
    """Estimate the compiled latency (microseconds) of the graph rooted at `out`.

    int8 scales streamed weight bytes by ~0.5 (per-channel int8 streams half the
    bytes; activations stay fp16). This is the only dtype lever the model needs to
    rank int8 vs fp16; everything else is structural.

    `target` (an ANE arch string, e.g. 'h13'/'h17s') switches to the measurement-free
    ANALYTIC per-chip model (Direction A) - a roofline taken from the nearest
    silicon-measured anchor (M1/h13 or M5/h17s) and scaled to that chip's {cores, clock,
    efficiency} curve, valid for all 28 chips with no on-device measurement (+/-17% on
    the measured M1 convs; the M5 anchor lands the loop-closure convs within ~15% on the
    quoted set). `target=None` (the default) uses the precise M5-measured heuristic below,
    unchanged.
    """
    if target is not None:
        return _estimate_analytic(out, target, int8)
    c = _constants()
    order = _topo(out)
    nodes = [t for t in order if t.op != "input"]
    cut_nodes = [t for t in nodes if t.op in NETPLIST_OPS]
    region_nodes = [t for t in nodes if t.op not in NETPLIST_OPS]

    floor = c["floor_us"]

    def _ncost(t) -> float:
        if int8 and t.op in ("matmul",):
            # int8 halves the weight bytes for the (dominant) projection weights
            in_elems = sum(_elems(s.shape) for s in t.srcs)
            wbytes = _weight_elems(t) * 1.0   # int8 = 1 byte/elem instead of 2
            bytes_moved = (in_elems + _elems(t.shape)) * 2.0 + wbytes
            flops = _node_flops(t)
            return max(floor, bytes_moved / c["bw_bytes_per_us"], flops / c["flops_per_us"])
        return node_cost(t)

    # int8 tie-breaker: even when a graph is floor-bound (so the roofline ties int8
    # and fp16 at the dispatch floor), int8 always streams <= fp16 bytes, never more.
    # Encode that as a tiny weight-byte-proportional discount so the model is DECISIVE
    # and directionally correct (int8 predicted <= fp16) instead of an arbitrary tie.
    # Scaled well below a floor's worth so it never reorders variants with a real cost
    # difference.
    int8_discount = 0.0
    if int8:
        saved_bytes = sum(_weight_elems(t) for t in region_nodes if t.op == "matmul")
        int8_discount = min(0.49 * floor, saved_bytes / c["bw_bytes_per_us"] * 0.5)

    if not cut_nodes:
        # one fused program: one floor + each node's above-floor work
        region = sum(max(0.0, _ncost(t) - floor) for t in region_nodes)
        return floor + region - int8_discount

    # segmented: fused regions (each pays one floor) interleaved with cuts.
    # Mirror _compile_segmented: a region is built per cut-source and for the final
    # output. Approximate region count as the number of distinct fused-program segments
    # - at most (n_cuts + 1) - and charge each a floor; the cheap nodes spread across
    # them, so keep the global above-floor sum and add (n_regions) floors plus the cuts.
    n_cuts = len(cut_nodes)
    n_regions = (n_cuts + 1) if region_nodes else 0
    region_work = sum(max(0.0, _ncost(t) - floor) for t in region_nodes)
    # each bridge node's own compute cost: prefer the measured per-family lookup
    # (bridge_cost); fall back to the generic roofline for families with no data.
    def _bridge_or_roofline(t) -> float:
        bc = bridge_cost(t)
        return bc if bc is not None else _ncost(t)
    cut_work = sum(_bridge_or_roofline(t) for t in cut_nodes)
    return n_regions * floor + region_work + cut_work + n_cuts * c["cut_us"] - int8_discount

estimate_provenance

estimate_provenance(target: str) -> dict

Is estimate(out, target=...) silicon-anchored or extrapolated for target?

Three chips were measured and fit a roofline anchor (_ANCHORS): A13/h13 (M1), A14/h14 (M2 Pro), A16/h17s (M5). A target whose capability family OWNS one of those anchors is silicon-measured (the A16 tier folds H16/H17* into the h17s point); every other target is extrapolated from the nearest measured anchor by its {cores, clock, efficiency} curve. Surfaces that distinction so a caller knows whether a per-chip estimate rests on measured silicon or a generational projection.

Returns {'target', 'anchor', 'measured': bool, 'basis': str} where anchor is the silicon point the estimate is built on, measured is True iff target's family has its own anchor, and basis is 'silicon' or 'extrapolated-from-<anchor>'. Raises ValueError on an unknown arch (mirrors cross_compile_check).

Source code in aneforge/_cost.py
def estimate_provenance(target: str) -> dict:
    """Is `estimate(out, target=...)` silicon-anchored or extrapolated for `target`?

    Three chips were measured and fit a roofline anchor (`_ANCHORS`): A13/h13 (M1),
    A14/h14 (M2 Pro), A16/h17s (M5). A target whose capability family OWNS one of those
    anchors is silicon-measured (the A16 tier folds H16/H17* into the h17s point); every
    other target is extrapolated from the nearest measured anchor by its {cores, clock,
    efficiency} curve. Surfaces that distinction so a caller knows whether a per-chip
    estimate rests on measured silicon or a generational projection.

    Returns `{'target', 'anchor', 'measured': bool, 'basis': str}` where `anchor` is
    the silicon point the estimate is built on, `measured` is True iff `target`'s family
    has its own anchor, and `basis` is `'silicon'` or `'extrapolated-from-<anchor>'`.
    Raises `ValueError` on an unknown arch (mirrors `cross_compile_check`)."""
    from . import _targets as _TG
    key = target.strip().lower()
    if key not in _TG._ARCH_FAMILY:
        raise ValueError(f"unknown ANE target arch {target!r}; known: "
                         f"{sorted(_TG._ARCH_FAMILY)}")
    anchor = _anchor_for_arch(key)
    measured_families = {int(_TG.family_of_arch(a)) for a in _ANCHORS}
    measured = int(_TG.family_of_arch(key)) in measured_families
    return {
        "target": key,
        "anchor": anchor,
        "measured": measured,
        "basis": "silicon" if measured else f"extrapolated-from-{anchor}",
    }

project_peak

project_peak(arch: str) -> dict

Measurement-free fp16 peak-throughput projection for any ANE target, anchored to the measured M1 point (1.8 TFLOP/s). Returns {tflops, rel_m1, cores, ghz} - the generational-scaling table (M5 ~5.5x, H17d ~22x, M11 ~0.1x) needs no silicon beyond M1.

Source code in aneforge/_cost.py
def project_peak(arch: str) -> dict:
    """Measurement-free fp16 peak-throughput projection for any ANE target, anchored to
    the measured M1 point (1.8 TFLOP/s). Returns {tflops, rel_m1, cores, ghz} - the
    generational-scaling table (M5 ~5.5x, H17d ~22x, M11 ~0.1x) needs no silicon beyond M1."""
    scale = _compute_scale(arch)
    c = _curve_for_arch(arch)
    return {
        "tflops": _M1_MEASURED_PEAK_TFLOPS * scale,
        "rel_m1": scale,
        "cores": int(c["cores_0x238"]),
        "ghz": _CLOCK_FRACTION * max(c["freq_0x760"]) / 1e9,
    }

precision_risk

precision_risk(out, verbose: bool = False) -> dict

Heuristic fp16-cancellation risk for the graph rooted at out.

Returns a dict::

{"graph_error": float,        # est. worst-case relerr proxy in [0,1]
 "nodes": [ {idx, op, kind, est_error, fixable, reason}, ... ],
 "hotspots": [idx, ...]}      # node indices flagged above the clean floor

kind in {narrow_sum, cancel_sub, groupnorm_cliff}. fixable names the numerics-aware rewrite that addresses it ("reduce_sum->matmul", "paired-fp16", or "" for an avoid/flag-only cliff). This is a HEURISTIC, not a bound - see the module note above for what it catches and (importantly) misses.

Source code in aneforge/_cost.py
def precision_risk(out, verbose: bool = False) -> dict:
    """Heuristic fp16-cancellation risk for the graph rooted at `out`.

    Returns a dict::

        {"graph_error": float,        # est. worst-case relerr proxy in [0,1]
         "nodes": [ {idx, op, kind, est_error, fixable, reason}, ... ],
         "hotspots": [idx, ...]}      # node indices flagged above the clean floor

    `kind` in {narrow_sum, cancel_sub, groupnorm_cliff}. `fixable` names the
    numerics-aware rewrite that addresses it ("reduce_sum->matmul", "paired-fp16",
    or "" for an avoid/flag-only cliff). This is a HEURISTIC, not a bound - see the
    module note above for what it catches and (importantly) misses.
    """
    order = _topo(out)
    nodes = []
    for i, t in enumerate(order):
        # (a) narrow-accumulator signed reduce_sum
        if t.op == "reduce_sum":
            K = _reduce_len(t)
            signed = (not t.srcs) or _is_signed_producer(t.srcs[0])
            if signed and K >= _NARROW_SUM_FLOOR:
                # error grows ~ sqrt(K) * fp16_eps under the narrow accumulator;
                # cap at 1.0. This is an order-of-magnitude proxy, not a bound.
                est = min(1.0, _FP16_CLEAN * (K ** 0.5))
                nodes.append({"idx": i, "op": t.op, "kind": "narrow_sum",
                              "est_error": est, "fixable": "reduce_sum->matmul",
                              "reason": f"signed reduce_sum over K={K} (narrow fp16 accumulator)"})
            continue
        # (b) CFG-style subtract - candidate cancellation (data-dependent, can't
        #     confirm structurally). Flag sub of two non-trivial activations.
        if t.op == "sub" and len(t.srcs) == 2:
            big = _elems(t.shape) >= 64    # a vector/tensor sub (not a scalar bias)
            both_live = all(s.op not in ("muls",) for s in t.srcs)
            if big and both_live:
                nodes.append({"idx": i, "op": t.op, "kind": "cancel_sub",
                              "est_error": _FP16_CLEAN,  # only RISKS blowing up; unknown w/o data
                              "fixable": "paired-fp16",
                              "reason": "subtract of two live tensors (CANDIDATE catastrophic "
                                        "cancellation - confirm with data; fix is upstream paired-fp16)"})
            continue
        # (c) group_norm at the per-axis wall. The rank-4 tiled lowering reduces over
        #     [1,G,C/groups,H*W], so the cliff is max(C/groups, H*W) > 65536 (aligned to
        #     af.group_norm's construction guard) - NOT the flattened (C/groups)*H*W,
        #     which the tiling now keeps under the cap (640@64, 512@128 run fine in fp16).
        if t.op == "group_norm" and len(t.shape) == 4:
            _, C, H, W = t.shape
            groups = int(t.attrs.get("groups", 1)) or 1
            if max(C // groups, H * W) > 65536:
                nodes.append({"idx": i, "op": t.op, "kind": "groupnorm_cliff",
                              "est_error": 0.0, "fixable": "",
                              "reason": f"group_norm tiled axis max(C/groups,H*W)>65536 at {H}x{W}: "
                                        "AVOID - exceeds the ANE per-axis bound"})
            continue

    # Default hotspots = only the RELIABLE, structurally-determinable signals: a narrow
    # reduce_sum whose estimated error exceeds the fp16-clean floor, and the group_norm
    # per-axis wall. cancel_sub is SPECULATIVE - a subtract of two live tensors is a
    # *candidate* for cancellation, but most (residuals, losses, differences) are benign
    # and unconfirmable without data. Flagging every such subtract trained users to ignore
    # the warning, so cancel_sub is now informational-only: it stays in `nodes` (surfaced
    # by precision_risk(verbose=True)) but does not raise the default warning.
    hotspots = [n["idx"] for n in nodes if n["est_error"] > _FP16_CLEAN
                or n["kind"] == "groupnorm_cliff"]
    graph_error = max([_FP16_CLEAN] + [n["est_error"] for n in nodes])
    if verbose:
        print(f"[precision] graph_error~{graph_error:.2e}, {len(nodes)} flagged node(s):")
        for n in nodes:
            print(f"  node {n['idx']:3d} {n['op']:12s} kind={n['kind']:16s} "
                  f"est~{n['est_error']:.2e} fix={n['fixable'] or '(avoid)'}: {n['reason']}")
    return {"graph_error": graph_error, "nodes": nodes, "hotspots": hotspots}

reset_compile_breaker

reset_compile_breaker() -> None

Clear the backoff state (forget the last failure).

Source code in aneforge/_circuit.py
def reset() -> None:
    """Clear the backoff state (forget the last failure)."""
    global _last_failure_ts
    with _lock:
        _last_failure_ts = None