Skip to content

Processing Phases

The per-phase processing functions that make up the pipeline.

spineps.phase_pre

spineps.phase_pre

Pre-processing phase: crop, N4 bias correction, and intensity normalization of the input MRI before segmentation.

compute_crop

compute_crop(
    nii: NII,
    out_file: str | Path,
    dataset_id=100,
    ddevice: Literal["cpu", "cuda", "mps"] = "cuda",
    gpu=0,
    max_folds=None,
    logger=None,
) -> tuple[slice, slice, slice]

Run the Vibe whole-body segmentation and compute a crop region around the spine.

Segments the input with run_vibeseg, keeps only the spine-relevant labels (IVD, vertebra body, vertebra posterior elements, and sacrum), and returns a bounding-box crop expanded by VIBE_CROP_MARGIN_MM.

Parameters:

Name Type Description Default
nii NII

Input MRI image to segment and crop.

required
out_file str | Path

Path where the Vibe segmentation output is written.

required
dataset_id int

Vibe model/dataset identifier passed to run_vibeseg. Defaults to 100.

100
ddevice Literal['cpu', 'cuda', 'mps']

Compute device for inference. Defaults to "cuda".

'cuda'
gpu int

GPU index used when running on CUDA. Defaults to 0.

0
max_folds int | None

Maximum number of model folds to ensemble. Defaults to None (all folds).

None
logger optional

Logger forwarded to run_vibeseg when that version supports it. Defaults to None.

None

Returns:

Type Description
tuple[slice, slice, slice]

tuple[slice, slice, slice]: The crop slices around the segmented spine, with a VIBE_CROP_MARGIN_MM margin.

Source code in spineps/phase_pre.py
def compute_crop(
    nii: NII, out_file: str | Path, dataset_id=100, ddevice: Literal["cpu", "cuda", "mps"] = "cuda", gpu=0, max_folds=None, logger=None
) -> tuple[slice, slice, slice]:
    """Run the Vibe whole-body segmentation and compute a crop region around the spine.

    Segments the input with ``run_vibeseg``, keeps only the spine-relevant labels (IVD, vertebra body,
    vertebra posterior elements, and sacrum), and returns a bounding-box crop expanded by ``VIBE_CROP_MARGIN_MM``.

    Args:
        nii (NII): Input MRI image to segment and crop.
        out_file: Path where the Vibe segmentation output is written.
        dataset_id (int, optional): Vibe model/dataset identifier passed to ``run_vibeseg``. Defaults to 100.
        ddevice (Literal["cpu", "cuda", "mps"], optional): Compute device for inference. Defaults to "cuda".
        gpu (int, optional): GPU index used when running on CUDA. Defaults to 0.
        max_folds (int | None, optional): Maximum number of model folds to ensemble. Defaults to None (all folds).
        logger (optional): Logger forwarded to ``run_vibeseg`` when that version supports it. Defaults to None.

    Returns:
        tuple[slice, slice, slice]: The crop slices around the segmented spine, with a ``VIBE_CROP_MARGIN_MM`` margin.
    """
    from TPTBox.core.vert_constants import Full_Body_Instance_Vibe
    from TPTBox.segmentation import run_vibeseg

    if _has_logger_arg(run_vibeseg):
        out = run_vibeseg(nii, out_file, dataset_id=dataset_id, ddevice=ddevice, gpu=gpu, max_folds=max_folds, logger=logger)
    else:  # backwards compatibility, can be removed if we force to a new version of TPTBox than 30.Apr.26
        out = run_vibeseg(nii, out_file, dataset_id=dataset_id, ddevice=ddevice, gpu=gpu, max_folds=max_folds)
    seg = to_nii(out, True)
    seg.extract_label_(
        [
            Full_Body_Instance_Vibe.IVD,
            Full_Body_Instance_Vibe.vertebra_body,
            Full_Body_Instance_Vibe.vertebra_posterior_elements,
            Full_Body_Instance_Vibe.sacrum,
        ]
    )
    return seg.compute_crop(0, dist=VIBE_CROP_MARGIN_MM / min(seg.zoom))

preprocess_input

preprocess_input(
    mri_nii: NII,
    debug_data: dict,
    pad_size: int = 4,
    proc_normalize_input: bool = True,
    proc_do_n4_bias_correction: bool = True,
    proc_crop_input: bool = True,
    verbose: bool = False,
) -> tuple[NII | None, ErrCode]

Pre-process an input MRI for segmentation: normalize, crop, N4-correct, and pad.

Optionally rescales intensities to [NORMALIZE_MIN_VALUE, NORMALIZE_MAX_VALUE], crops away empty background to speed up computation, applies N4 bias field correction on the crop, re-normalizes, writes the processed crop back into the full image, and finally pads the volume by pad_size on every side.

Parameters:

Name Type Description Default
mri_nii NII

Input grayscale MRI image.

required
debug_data dict

Dictionary for collecting intermediate results (unused here, reserved for parity).

required
pad_size int

Number of voxels of edge padding added on each side per axis. Defaults to 4.

4
proc_normalize_input bool

Whether to rescale intensities into the normalization range. Defaults to True.

True
proc_do_n4_bias_correction bool

Whether to apply N4 bias field correction. Defaults to True.

True
proc_crop_input bool

Whether to crop away background before processing. Defaults to True.

True
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
NII | None

tuple[NII | None, ErrCode]: The padded, pre-processed image and ErrCode.OK; or (None, ErrCode.EMPTY)

ErrCode

if the input image is empty.

Source code in spineps/phase_pre.py
def preprocess_input(
    mri_nii: NII,
    debug_data: dict,  # noqa: ARG001
    pad_size: int = 4,
    proc_normalize_input: bool = True,
    proc_do_n4_bias_correction: bool = True,
    proc_crop_input: bool = True,
    verbose: bool = False,
) -> tuple[NII | None, ErrCode]:
    """Pre-process an input MRI for segmentation: normalize, crop, N4-correct, and pad.

    Optionally rescales intensities to ``[NORMALIZE_MIN_VALUE, NORMALIZE_MAX_VALUE]``, crops away empty
    background to speed up computation, applies N4 bias field correction on the crop, re-normalizes,
    writes the processed crop back into the full image, and finally pads the volume by ``pad_size`` on every side.

    Args:
        mri_nii (NII): Input grayscale MRI image.
        debug_data (dict): Dictionary for collecting intermediate results (unused here, reserved for parity).
        pad_size (int, optional): Number of voxels of edge padding added on each side per axis. Defaults to 4.
        proc_normalize_input (bool, optional): Whether to rescale intensities into the normalization range. Defaults to True.
        proc_do_n4_bias_correction (bool, optional): Whether to apply N4 bias field correction. Defaults to True.
        proc_crop_input (bool, optional): Whether to crop away background before processing. Defaults to True.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[NII | None, ErrCode]: The padded, pre-processed image and ``ErrCode.OK``; or ``(None, ErrCode.EMPTY)``
        if the input image is empty.
    """
    logger.print("Prepare input image", Log_Type.STAGE)
    mri_nii = mri_nii.copy()
    with logger:
        # Crop Down
        try:
            # Enforce to range [0, 1500]
            if proc_normalize_input:
                mri_nii.normalize_to_range_(min_value=NORMALIZE_MIN_VALUE, max_value=NORMALIZE_MAX_VALUE, verbose=False)
                crop = mri_nii.compute_crop(dist=0) if proc_crop_input else (slice(None, None), slice(None, None), slice(None, None))
            else:
                crop = (
                    mri_nii.compute_crop(minimum=mri_nii.min(), dist=0)
                    if proc_crop_input
                    else (slice(None, None), slice(None, None), slice(None, None))
                )
        except ValueError:
            logger.print("Image Nifty is empty, skip this", Log_Type.FAIL)
            return None, ErrCode.EMPTY

        cropped_nii = mri_nii.apply_crop(crop)
        logger.print(f"Crop down from {mri_nii.shape} to {cropped_nii.shape}", verbose=verbose)

        # N4 Bias field correction
        if proc_do_n4_bias_correction:
            n4_start = perf_counter()
            cropped_nii, _ = n4_bias(cropped_nii)  # PIR
            logger.print(f"N4 Bias field correction done in {perf_counter() - n4_start} sec", verbose=True)

        # Enforce to range [NORMALIZE_MIN_VALUE, NORMALIZE_MAX_VALUE]
        if proc_normalize_input:
            cropped_nii.normalize_to_range_(min_value=NORMALIZE_MIN_VALUE, max_value=NORMALIZE_MAX_VALUE, verbose=logger)

        # Uncrop again
        # uncropped_input[crop] = cropped_nii.get_array()
        mri_nii[crop] = cropped_nii
        logger.print(f"Uncrop back from {cropped_nii.shape} to {mri_nii.shape}", verbose=verbose)

        # Apply padding
        padded_nii = mri_nii.pad_to(tuple(mri_nii.shape[i] + (2 * pad_size) for i in range(3)))
        logger.print(f"Padded from {mri_nii.shape} to {padded_nii.shape}", verbose=verbose)

    # Return pre-processed
    return padded_nii, ErrCode.OK

spineps.phase_semantic

spineps.phase_semantic

Semantic phase: predict and post-process the subregion (semantic) segmentation mask of the spine.

predict_semantic_mask

predict_semantic_mask(
    mri_nii: NII,
    model: Segmentation_Model,
    debug_data: dict,
    proc_fill_3d_holes: bool = True,
    proc_clean_beyond_largest_bounding_box: bool = True,
    proc_remove_inferior_beyond_canal: bool = False,
    proc_clean_small_cc_artifacts: bool = True,
    verbose: bool = False,
) -> tuple[NII | None, NII | None, ErrCode]

Predict the semantic (subregion) segmentation mask and run post-processing on it.

Runs the model on the input MRI (resampling to the model's recommended zoom), then optionally removes structures beyond the spinal-canal height, cleans small connected-component artifacts, restricts the mask to the largest bounding box of connected components, and fills 3D holes.

Parameters:

Name Type Description Default
mri_nii NII

Input grayscale MRI image (intensities must start at 0).

required
model Segmentation_Model

Model used to produce the semantic segmentation.

required
debug_data dict

Dictionary for collecting intermediate results (e.g. the raw segmentation).

required
proc_fill_3d_holes bool

Whether to fill 3D holes in the output mask. Defaults to True.

True
proc_clean_beyond_largest_bounding_box bool

Whether to keep only connected components within the largest bounding box. Defaults to True.

True
proc_remove_inferior_beyond_canal bool

Whether to remove non-sacrum structures below the spinal-canal height. Defaults to False.

False
proc_clean_small_cc_artifacts bool

Whether to delete small connected-component artifacts. Defaults to True.

True
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
NII | None

tuple[NII | None, NII | None, ErrCode]: The post-processed semantic mask, the softmax logits, and an

NII | None

error code (ErrCode.OK on success, ErrCode.EMPTY if the predicted mask is empty).

Source code in spineps/phase_semantic.py
def predict_semantic_mask(
    mri_nii: NII,
    model: Segmentation_Model,
    debug_data: dict,
    proc_fill_3d_holes: bool = True,
    proc_clean_beyond_largest_bounding_box: bool = True,
    proc_remove_inferior_beyond_canal: bool = False,
    proc_clean_small_cc_artifacts: bool = True,
    verbose: bool = False,
) -> tuple[NII | None, NII | None, ErrCode]:
    """Predict the semantic (subregion) segmentation mask and run post-processing on it.

    Runs the model on the input MRI (resampling to the model's recommended zoom), then optionally removes
    structures beyond the spinal-canal height, cleans small connected-component artifacts, restricts the mask
    to the largest bounding box of connected components, and fills 3D holes.

    Args:
        mri_nii (NII): Input grayscale MRI image (intensities must start at 0).
        model (Segmentation_Model): Model used to produce the semantic segmentation.
        debug_data (dict): Dictionary for collecting intermediate results (e.g. the raw segmentation).
        proc_fill_3d_holes (bool, optional): Whether to fill 3D holes in the output mask. Defaults to True.
        proc_clean_beyond_largest_bounding_box (bool, optional): Whether to keep only connected components within
            the largest bounding box. Defaults to True.
        proc_remove_inferior_beyond_canal (bool, optional): Whether to remove non-sacrum structures below the
            spinal-canal height. Defaults to False.
        proc_clean_small_cc_artifacts (bool, optional): Whether to delete small connected-component artifacts. Defaults to True.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[NII | None, NII | None, ErrCode]: The post-processed semantic mask, the softmax logits, and an
        error code (``ErrCode.OK`` on success, ``ErrCode.EMPTY`` if the predicted mask is empty).
    """
    logger.print("Predict Semantic Mask", Log_Type.STAGE)
    with logger:
        results = model.segment_scan(
            mri_nii,
            pad_size=0,
            resample_to_recommended=True,
            resample_output_to_input_space=False,
            verbose=verbose,
        )  # type:ignore
        seg_nii = results[OutputType.seg]
        # unc_nii = results.get(OutputType.unc, None)
        softmax_logits = results[OutputType.softmax_logits]

        logger.print("Post-process semantic mask...")

        debug_data["sem_raw"] = seg_nii.copy()

        if seg_nii.is_empty:
            logger.print("Subregion mask is empty, skip this", Log_Type.FAIL)
            return seg_nii, softmax_logits, ErrCode.EMPTY

        if proc_remove_inferior_beyond_canal:
            seg_nii = remove_nonsacrum_beyond_canal_height(seg_nii=seg_nii.copy())

        if proc_clean_small_cc_artifacts:
            seg_nii.set_array_(
                clean_cc_artifacts(
                    seg_nii,
                    logger=logger,
                    verbose=False,
                    labels=[
                        Location.Arcus_Vertebrae.value,
                        Location.Spinosus_Process.value,
                        Location.Costal_Process_Left.value,
                        Location.Costal_Process_Right.value,
                        Location.Superior_Articular_Left.value,
                        Location.Superior_Articular_Right.value,
                        Location.Inferior_Articular_Left.value,
                        Location.Inferior_Articular_Right.value,
                        Location.Vertebra_Corpus_border.value,
                        Location.Spinal_Canal.value,
                        Location.Vertebra_Disc.value,
                    ],
                    only_delete=True,
                    ignore_missing_labels=True,
                    cc_size_threshold=mm3_to_voxels(SMALL_CC_SIZE_THRESHOLD_MM3, seg_nii.zoom),
                ),
                verbose=verbose,
            )

        # Do two iterations of both processing if enabled to make sure
        if proc_remove_inferior_beyond_canal:
            seg_nii = remove_nonsacrum_beyond_canal_height(seg_nii=seg_nii.copy())

        if proc_clean_beyond_largest_bounding_box:
            seg_nii = semantic_bounding_box_clean(seg_nii=seg_nii.copy())

        if proc_remove_inferior_beyond_canal and proc_clean_beyond_largest_bounding_box:
            seg_nii = remove_nonsacrum_beyond_canal_height(seg_nii=seg_nii.copy())
            seg_nii = semantic_bounding_box_clean(seg_nii=seg_nii.copy())

        if proc_fill_3d_holes:
            seg_nii = seg_nii.fill_holes_(fill_holes_labels, verbose=logger)
            # seg_fh = seg_nii.extract_label(fill_holes_labels, keep_label=True)
            # seg_fh = seg_fh.fill_holes_global_with_majority_voting(verbose=logger)
            # seg_nii[seg_nii == 0] = seg_fh[seg_nii == 0]

    return seg_nii, softmax_logits, ErrCode.OK

remove_nonsacrum_beyond_canal_height

remove_nonsacrum_beyond_canal_height(seg_nii: NII) -> NII

Remove non-sacrum labels that lie above or below the spinal-canal extent.

Computes the inferior-axis (I) extent of the spinal canal/cord, expanded by CANAL_HEIGHT_MARGIN_MM, and zeroes out everything outside that range. The sacrum (SACRUM_LABEL) is kept regardless of position. If no canal/cord is present, the mask is returned unchanged.

Parameters:

Name Type Description Default
seg_nii NII

Semantic segmentation mask in ("P", "I", "R") orientation.

required

Returns:

Name Type Description
NII NII

The segmentation mask with structures beyond the canal height removed (sacrum preserved).

Raises:

Type Description
AssertionError

If seg_nii is not in ("P", "I", "R") orientation.

Source code in spineps/phase_semantic.py
def remove_nonsacrum_beyond_canal_height(seg_nii: NII) -> NII:
    """Remove non-sacrum labels that lie above or below the spinal-canal extent.

    Computes the inferior-axis (I) extent of the spinal canal/cord, expanded by ``CANAL_HEIGHT_MARGIN_MM``,
    and zeroes out everything outside that range. The sacrum (``SACRUM_LABEL``) is kept regardless of position.
    If no canal/cord is present, the mask is returned unchanged.

    Args:
        seg_nii (NII): Semantic segmentation mask in ("P", "I", "R") orientation.

    Returns:
        NII: The segmentation mask with structures beyond the canal height removed (sacrum preserved).

    Raises:
        AssertionError: If ``seg_nii`` is not in ("P", "I", "R") orientation.
    """
    seg_nii.assert_affine(orientation=("P", "I", "R"))
    canal_nii = seg_nii.extract_label([Location.Spinal_Canal.value, Location.Spinal_Cord.value])
    if canal_nii.sum() == 0:
        return seg_nii
    crop_i = canal_nii.compute_crop(dist=CANAL_HEIGHT_MARGIN_MM / seg_nii.zoom[1])[1]
    seg_arr = seg_nii.get_seg_array()
    sacrum_arr = seg_nii.extract_label(SACRUM_LABEL).get_seg_array()
    seg_arr[:, 0 : crop_i.start, :] = 0
    seg_arr[:, crop_i.stop :, :] = 0
    seg_arr[sacrum_arr == 1] = SACRUM_LABEL
    return seg_nii.set_array_(seg_arr)

semantic_bounding_box_clean

semantic_bounding_box_clean(seg_nii: NII) -> NII

Keep only connected components that fall within the spine's growing bounding box.

Binarizes the mask and labels its connected components. Starting from the largest component's bounding box (expanded by CC_BBOX_MARGIN_MM), it iteratively merges in any other component whose bounding box overlaps the current region in all three axes (with extra inferior margin to tolerate gaps in the spine). Voxels outside the resulting region, and any non-incorporated components, are removed. Components are dropped if the binary mask has more than MAX_EXPECTED_SEMANTIC_CC parts (logged as strange).

Parameters:

Name Type Description Default
seg_nii NII

Semantic segmentation mask to clean.

required

Returns:

Name Type Description
NII NII

The cleaned segmentation mask, restored to its original orientation.

Source code in spineps/phase_semantic.py
def semantic_bounding_box_clean(seg_nii: NII) -> NII:
    """Keep only connected components that fall within the spine's growing bounding box.

    Binarizes the mask and labels its connected components. Starting from the largest component's bounding box
    (expanded by ``CC_BBOX_MARGIN_MM``), it iteratively merges in any other component whose bounding box overlaps
    the current region in all three axes (with extra inferior margin to tolerate gaps in the spine). Voxels
    outside the resulting region, and any non-incorporated components, are removed. Components are dropped if the
    binary mask has more than ``MAX_EXPECTED_SEMANTIC_CC`` parts (logged as strange).

    Args:
        seg_nii (NII): Semantic segmentation mask to clean.

    Returns:
        NII: The cleaned segmentation mask, restored to its original orientation.
    """
    ori = seg_nii.orientation
    seg_binary = seg_nii.reorient_().extract_label(list(seg_nii.unique()))  # whole thing binary
    # Resolution-aware bounding-box margin (mm -> voxels at the current zoom).
    bbox_margin_dist = CC_BBOX_MARGIN_MM / min(seg_nii.zoom)
    bbox_margin_vox = mm_to_voxels(CC_BBOX_MARGIN_MM, seg_nii.zoom)
    seg_bin_largest_k_cc_nii: NII = seg_binary.filter_connected_components(
        max_count_component=None, labels=1, connectivity=3, keep_label=False
    )
    max_k = int(seg_bin_largest_k_cc_nii.max())
    if max_k > MAX_EXPECTED_SEMANTIC_CC:
        logger.print(f"Found {max_k} unique connected components in semantic mask", Log_Type.STRANGE)
    # PIR
    largest_nii = seg_bin_largest_k_cc_nii.extract_label(1)
    # width fixed, and heigh include all connected components within bounding box, then repeat
    p_slice, i_slice, r_slice = largest_nii.compute_crop(dist=bbox_margin_dist)
    bboxes = [(p_slice, i_slice, r_slice)]

    # PIR -> fixed, extendable, extendable
    incorporated = [1]
    changed = True
    while changed:
        changed = False
        for k in [l for l in range(2, max_k + 1) if l not in incorporated]:
            k_nii = seg_bin_largest_k_cc_nii.extract_label(k)
            p, i, r = k_nii.compute_crop(dist=bbox_margin_dist)

            for bbox in bboxes:
                i_slice_compare = slice(
                    max(bbox[1].start - bbox_margin_vox, 0), bbox[1].stop + bbox_margin_vox
                )  # more margin in inferior direction (allows for gaps in spine)
                if overlap_slice(bbox[0], p) and overlap_slice(i_slice_compare, i) and overlap_slice(bbox[2], r):
                    # extend bbox
                    bboxes.append((p, i, r))
                    incorporated.append(k)
                    changed = True
                    break

    seg_bin_arr = seg_binary.get_seg_array()
    crop = (p_slice, i_slice, r_slice)
    seg_bin_clean_arr = np.zeros(seg_bin_arr.shape)
    seg_bin_clean_arr[crop] = 1

    # everything below biggest k get always removed
    largest_k_arr = seg_bin_largest_k_cc_nii.get_seg_array()
    seg_bin_clean_arr[largest_k_arr == 0] = 0

    seg_arr = seg_nii.get_seg_array()
    # logger.print(seg_nii.volumes())
    seg_arr[seg_bin_clean_arr != 1] = 0
    seg_nii.set_array_(seg_arr)
    seg_nii.reorient_(ori)
    cleaned_ks = [l for l in range(2, max_k + 1) if l not in incorporated]
    if len(cleaned_ks) > 0:
        logger.print("semantic_bounding_box_clean", f"got rid of connected components k={cleaned_ks}")
    else:
        logger.print("semantic_bounding_box_clean", "did not remove anything")
    return seg_nii

overlap_slice

overlap_slice(slice1: slice, slice2: slice) -> bool

Check whether two ranges defined by slices overlap (borders inclusive).

Parameters:

Name Type Description Default
slice1 slice

First range, using its start and stop bounds.

required
slice2 slice

Second range, using its start and stop bounds.

required

Returns:

Name Type Description
bool bool

True if the two ranges overlap or touch at a border, else False.

Source code in spineps/phase_semantic.py
def overlap_slice(slice1: slice, slice2: slice) -> bool:
    """Check whether two ranges defined by slices overlap (borders inclusive).

    Args:
        slice1 (slice): First range, using its ``start`` and ``stop`` bounds.
        slice2 (slice): Second range, using its ``start`` and ``stop`` bounds.

    Returns:
        bool: True if the two ranges overlap or touch at a border, else False.
    """
    slice1s = slice1.start
    slice1e = slice1.stop

    slice2s = slice2.start
    slice2e = slice2.stop

    if slice1s in (slice2s, slice2e) or slice1e in (slice2s, slice2e):
        return True

    if slice2s > slice1s and slice2s <= slice1e:
        return True

    return bool(slice2s < slice1s and slice2e >= slice1s)

spineps.phase_instance

spineps.phase_instance

Instance phase: turn the subregion semantic mask into per-vertebra instance labels via cutout prediction and merging.

predict_instance_mask

predict_instance_mask(
    seg_nii: NII,
    model: Segmentation_Model,
    debug_data: dict,
    pad_size: int = 0,
    proc_inst_fill_3d_holes: bool = True,
    proc_detect_and_solve_merged_corpi: bool = True,
    proc_corpus_clean: bool = True,
    proc_inst_clean_small_cc_artifacts: bool = True,
    proc_inst_largest_k_cc: int = 0,
    verbose: bool = False,
) -> tuple[NII | None, ErrCode]

Build a per-vertebra instance mask from a subregion semantic segmentation.

Reorients and rescales the input to the model's recommended zoom, crops to the segmentation, derives one cutout per corpus center of mass, runs the instance model on each cutout, merges the overlapping per-vertebra predictions into a single label map, optionally cleans and fills it, then uncrops back to the original space.

Parameters:

Name Type Description Default
seg_nii NII

Subregion (semantic) segmentation mask used as input.

required
model Segmentation_Model

Instance model producing the per-vertebra-body cutout predictions.

required
debug_data dict

Dictionary for collecting intermediate results across the pipeline.

required
pad_size int

Edge padding added before processing and removed afterwards. Defaults to 0.

0
proc_inst_fill_3d_holes bool

Whether to fill 3D holes in the final vertebra mask. Defaults to True.

True
proc_detect_and_solve_merged_corpi bool

Whether to detect and split merged vertebral bodies. Defaults to True.

True
proc_corpus_clean bool

Whether to clean small corpus connected-component artifacts. Defaults to True.

True
proc_inst_clean_small_cc_artifacts bool

Whether to delete small instance artifacts. Defaults to True.

True
proc_inst_largest_k_cc int

Keep only the largest k connected components per cutout label; 0 disables. Defaults to 0.

0
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
NII | None

tuple[NII | None, ErrCode]: The vertebra instance mask in the input space and an error code. Returns

ErrCode

(None, ErrCode.EMPTY) if no corpus labels are present, (None, ErrCode.UNKNOWN) if no predictions

tuple[NII | None, ErrCode]

are produced, or (None, errcode) if merging fails.

Source code in spineps/phase_instance.py
def predict_instance_mask(
    seg_nii: NII,
    model: Segmentation_Model,
    debug_data: dict,
    pad_size: int = 0,
    proc_inst_fill_3d_holes: bool = True,
    proc_detect_and_solve_merged_corpi: bool = True,
    proc_corpus_clean: bool = True,
    proc_inst_clean_small_cc_artifacts: bool = True,
    proc_inst_largest_k_cc: int = 0,
    verbose: bool = False,
) -> tuple[NII | None, ErrCode]:
    """Build a per-vertebra instance mask from a subregion semantic segmentation.

    Reorients and rescales the input to the model's recommended zoom, crops to the segmentation, derives one
    cutout per corpus center of mass, runs the instance model on each cutout, merges the overlapping per-vertebra
    predictions into a single label map, optionally cleans and fills it, then uncrops back to the original space.

    Args:
        seg_nii (NII): Subregion (semantic) segmentation mask used as input.
        model (Segmentation_Model): Instance model producing the per-vertebra-body cutout predictions.
        debug_data (dict): Dictionary for collecting intermediate results across the pipeline.
        pad_size (int, optional): Edge padding added before processing and removed afterwards. Defaults to 0.
        proc_inst_fill_3d_holes (bool, optional): Whether to fill 3D holes in the final vertebra mask. Defaults to True.
        proc_detect_and_solve_merged_corpi (bool, optional): Whether to detect and split merged vertebral bodies. Defaults to True.
        proc_corpus_clean (bool, optional): Whether to clean small corpus connected-component artifacts. Defaults to True.
        proc_inst_clean_small_cc_artifacts (bool, optional): Whether to delete small instance artifacts. Defaults to True.
        proc_inst_largest_k_cc (int, optional): Keep only the largest k connected components per cutout label; 0 disables. Defaults to 0.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[NII | None, ErrCode]: The vertebra instance mask in the input space and an error code. Returns
        ``(None, ErrCode.EMPTY)`` if no corpus labels are present, ``(None, ErrCode.UNKNOWN)`` if no predictions
        are produced, or ``(None, errcode)`` if merging fails.
    """
    logger.print("Predict instance mask", Log_Type.STAGE)
    with logger:
        # Fixed constants for this approach
        cutout_size = model.inference_config.cutout_size
        corpus_size_cleaning = model.inference_config.corpus_size_cleaning  # voxel threshold * nako resolution
        corpus_border_threshold = model.inference_config.corpus_border_threshold
        vert_size_threshold = model.inference_config.vert_size_threshold

        logger.print("Vertebra input", seg_nii.zoom, seg_nii.orientation, seg_nii.shape, verbose=verbose)
        # Save values for resample back later
        # orientation = seg_nii.orientation
        # shp = seg_nii.shape

        seg_nii_rdy = seg_nii.reorient(verbose=logger)
        debug_data["inst_uncropped_Subreg_nii_a_PIR"] = seg_nii_rdy.copy()

        # Padding?
        if pad_size > 0:
            # logger.print(seg_nii_rdy.shape)
            arr = seg_nii_rdy.get_array()
            arr = np.pad(arr, pad_size, mode="edge")
            seg_nii_rdy.set_array_(arr)
            # logger.print(seg_nii_rdy.shape)

        zms = seg_nii_rdy.zoom
        logger.print("zms", zms, verbose=verbose)
        expected_zms = model.calc_recommended_resampling_zoom(seg_nii_rdy.zoom)
        if not seg_nii_rdy.assert_affine(zoom=expected_zms, raise_error=False):
            seg_nii_rdy.rescale_(expected_zms, verbose=logger)  # in PIR
        seg_nii_uncropped = seg_nii_rdy.copy()
        logger.print(
            "Vertebra seg_nii_uncropped", seg_nii_uncropped.zoom, seg_nii_uncropped.orientation, seg_nii_uncropped.shape, verbose=verbose
        )
        debug_data["inst_uncropped_Subreg_nii_b_zms"] = seg_nii_uncropped.copy()
        uncropped_vert_mask = np.zeros(seg_nii_uncropped.shape, dtype=seg_nii_uncropped.dtype)
        logger.print("Vertebra uncropped_vert_mask empty", uncropped_vert_mask.shape, verbose=verbose)
        crop = seg_nii_rdy.compute_crop(dist=INSTANCE_CROP_MARGIN_MM / min(seg_nii_rdy.zoom))
        # logger.print("Crop", crop, verbose=verbose)
        seg_nii_rdy.apply_crop_(crop)
        logger.print(f"Crop down from {uncropped_vert_mask.shape} to {seg_nii_rdy.shape}", verbose=verbose)
        # arr[crop] = X, then set nifty to arr
        logger.print("Vertebra seg_nii_rdy", seg_nii_rdy.zoom, seg_nii_rdy.orientation, seg_nii_rdy.shape, verbose=verbose)
        debug_data["inst_cropped_Subreg_nii_b"] = seg_nii_rdy.copy()
        #
        # make threshold in actual mm
        corpus_border_threshold = int(corpus_border_threshold / expected_zms[1])
        min_cleaning_voxels = mm3_to_voxels(MIN_CLEANING_VOLUME_MM3, expected_zms)
        corpus_size_cleaning = max(int(corpus_size_cleaning / (expected_zms[0] * expected_zms[1] * expected_zms[2])), min_cleaning_voxels)
        vert_size_threshold = max(int(vert_size_threshold / (expected_zms[0] * expected_zms[1] * expected_zms[2])), min_cleaning_voxels)

        seg_labels = seg_nii_rdy.unique()
        if Location.Vertebra_Corpus_border.value not in seg_labels:
            logger.print(f"no corpus ({Location.Vertebra_Corpus_border.value}) labels in this segmentation, cannot proceed", Log_Type.FAIL)
            return None, ErrCode.EMPTY
        # get all the 3vert predictions
        vert_predictions, hierarchical_existing_predictions, n_corpus_coms = collect_vertebra_predictions(
            seg_nii=seg_nii_rdy,
            model=model,
            corpus_size_cleaning=corpus_size_cleaning if proc_corpus_clean else 0,
            cutout_size=cutout_size,
            debug_data=debug_data,
            process_detect_and_solve_merged_corpi=proc_detect_and_solve_merged_corpi,
            proc_inst_largest_k_cc=proc_inst_largest_k_cc,
            proc_inst_fill_holes=False,
            verbose=verbose,
        )
        if vert_predictions is None:
            return None, ErrCode.UNKNOWN  # type:ignore
        logger.print(f"Vertebra Predictions done! Found {n_corpus_coms} initial candidates.", verbose=verbose)

        # debug_data["vert_predictions"] = vert_predictions
        whole_vert_nii, debug_data, errcode = from_vert3_predictions_make_vert_mask(
            seg_nii_rdy,
            vert_predictions,
            hierarchical_existing_predictions,
            vert_size_threshold,
            debug_data=debug_data,
            proc_inst_clean_small_cc_artifacts=proc_inst_clean_small_cc_artifacts,
        )
        del vert_predictions, hierarchical_existing_predictions
        if errcode != ErrCode.OK:
            return None, errcode
        logger.print("Merged predictions into vert mask")

        logger.print(
            "Vertebra whole_vert_nii_cropped", whole_vert_nii.zoom, whole_vert_nii.orientation, whole_vert_nii.shape, verbose=verbose
        )

        uniq_labels = whole_vert_nii.unique()

        if proc_inst_fill_3d_holes:
            whole_vert_nii.fill_holes_(verbose=logger)
        debug_data["inst_cropped_vert_arr_c_proc"] = whole_vert_nii.copy()
        n_vert_bodies = len(uniq_labels)
        logger.print(f"Predicted {n_vert_bodies} vertebrae")
        if n_vert_bodies < n_corpus_coms:
            logger.print(f"Number of vertebra {n_vert_bodies} smaller than number of corpus regions {n_corpus_coms}", Log_Type.WARNING)

        # label continuously
        labelmap = {l: i + 1 for i, l in enumerate(uniq_labels)}
        whole_vert_nii.map_labels_(labelmap, verbose=False)

        # clean vertebra mask with subreg mask
        logger.print(
            "Vertebra whole_vert_nii_cropped2", whole_vert_nii.zoom, whole_vert_nii.orientation, whole_vert_nii.shape, verbose=verbose
        )
        vert_nii_cleaned = whole_vert_nii
        uncropped_vert_mask[crop] = vert_nii_cleaned.get_seg_array()
        logger.print(f"Uncrop back from {vert_nii_cleaned.shape} to {uncropped_vert_mask.shape}", verbose=verbose)
        whole_vert_nii_uncropped = seg_nii_uncropped.set_array(uncropped_vert_mask)
        debug_data["inst_uncropped_vert_arr_a"] = whole_vert_nii_uncropped.copy()

        # Uncrop again
        if pad_size > 0:
            # logger.print(whole_vert_nii_uncropped.shape)
            arr = whole_vert_nii_uncropped.get_array()
            arr = arr[pad_size:-pad_size, pad_size:-pad_size, pad_size:-pad_size]
            whole_vert_nii_uncropped.set_array_(arr)
            # logger.print(whole_vert_nii_uncropped.shape)

    return whole_vert_nii_uncropped, ErrCode.OK

get_corpus_coms

get_corpus_coms(
    seg_nii: NII,
    corpus_size_cleaning: int,
    process_detect_and_solve_merged_corpi: bool = True,
    verbose: bool = False,
) -> list | None

Compute the center of mass of every vertebral corpus, optionally splitting merged bodies.

Extracts the corpus region (using the dense corpus label when available, otherwise the eroded and cleaned corpus-border label) and returns one center of mass per corpus, sorted from bottom to top (plus the dens when present). When process_detect_and_solve_merged_corpi is set, it cross-checks the vertebra/IVD height alternation: neighbors at the same height are merged, and a corpus whose volume exceeds the neighbor average by MERGED_CORPUS_VOLUME_RATIO is split via a separating plane.

Parameters:

Name Type Description Default
seg_nii NII

Subregion semantic mask in ("P", "I", "R") orientation.

required
corpus_size_cleaning int

Voxel threshold for removing small corpus-border artifacts; 0 disables cleaning.

required
process_detect_and_solve_merged_corpi bool

Whether to detect and split merged corpora. Defaults to True.

True
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
list | None

list | None: Corpus center-of-mass coordinates ordered from bottom to top, or None if no corpus is found.

Raises:

Type Description
AssertionError

If seg_nii is not in ("P", "I", "R") orientation.

Source code in spineps/phase_instance.py
def get_corpus_coms(
    seg_nii: NII,
    corpus_size_cleaning: int,
    process_detect_and_solve_merged_corpi: bool = True,
    verbose: bool = False,
) -> list | None:
    """Compute the center of mass of every vertebral corpus, optionally splitting merged bodies.

    Extracts the corpus region (using the dense corpus label when available, otherwise the eroded and cleaned
    corpus-border label) and returns one center of mass per corpus, sorted from bottom to top (plus the dens
    when present). When ``process_detect_and_solve_merged_corpi`` is set, it cross-checks the vertebra/IVD
    height alternation: neighbors at the same height are merged, and a corpus whose volume exceeds the neighbor
    average by ``MERGED_CORPUS_VOLUME_RATIO`` is split via a separating plane.

    Args:
        seg_nii (NII): Subregion semantic mask in ("P", "I", "R") orientation.
        corpus_size_cleaning (int): Voxel threshold for removing small corpus-border artifacts; 0 disables cleaning.
        process_detect_and_solve_merged_corpi (bool, optional): Whether to detect and split merged corpora. Defaults to True.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        list | None: Corpus center-of-mass coordinates ordered from bottom to top, or None if no corpus is found.

    Raises:
        AssertionError: If ``seg_nii`` is not in ("P", "I", "R") orientation.
    """
    seg_nii.assert_affine(orientation=("P", "I", "R"))
    dense = False
    # Extract Corpus region and try to find all coms naively (some skips should not matter)
    if Location.Vertebra_Corpus.value in seg_nii.unique():
        corpus_nii_cms = seg_nii.extract_label([Location.Vertebra_Corpus])
        corpus_nii = seg_nii.extract_label([Location.Vertebra_Corpus_border, Location.Vertebra_Corpus])
        dense = True
        # process_detect_and_solve_merged_corpi = False
    else:
        corpus_nii = seg_nii.extract_label([Location.Vertebra_Corpus_border])

        corpus_nii.erode_msk_(2, connectivity=2, verbose=False)
        if corpus_nii.max() != 0 and corpus_size_cleaning > 0:
            corpus_nii.set_array_(
                clean_cc_artifacts(
                    corpus_nii,
                    labels=[1],
                    cc_size_threshold=corpus_size_cleaning,
                    only_delete=True,
                    ignore_missing_labels=True,
                    logger=logger,
                    verbose=verbose,
                ),
                verbose=False,
            )
        corpus_nii_cms = corpus_nii
    if corpus_nii_cms.max() == 0:
        logger.print(f"No corpus found after get_corpus_coms post process, cannot make vertebra mask. {corpus_nii.unique()}", Log_Type.FAIL)
        return None

    if not process_detect_and_solve_merged_corpi:
        corpus_coms = corpus_nii_cms.get_segmentation_connected_components_center_of_mass(label=1, sort_by_axis=1)
        if dense:
            dens_cc = seg_nii.get_segmentation_connected_components_center_of_mass(label=Location.Dens_axis.value)
            corpus_coms.extend(dens_cc)
        corpus_coms.reverse()  # from bottom to top
        logger.print(f"Found {len(corpus_coms)} Corpus ccs", verbose=verbose)
        return corpus_coms

    ############
    # Detect merged vertebra and use plane split algo on it
    ############

    # Get corpus CCs
    corpus_cc: NII = corpus_nii_cms.get_connected_components(labels=1)
    corpus_cc_n = len(corpus_cc.unique())
    logger.print(f"Found {corpus_cc_n} Corpus ccs (naively)", verbose=verbose)

    # Check against ivd order
    seg_sem = seg_nii.map_labels({Location.Endplate.value: Location.Vertebra_Disc.value}, verbose=False)
    has_ivd: bool = Location.Vertebra_Disc.value in seg_sem.unique()
    subreg_cc: NII = seg_sem.get_connected_components(labels=Location.Vertebra_Disc.value)
    subreg_cc[subreg_cc > 0] += IVD_LABEL_OFFSET  # offset IVD CC labels to keep them distinct from corpus CC labels
    subreg_cc_n = len(subreg_cc.unique())
    logger.print(f"Found {subreg_cc_n} IVD ccs (naively)", verbose=verbose)
    coms = subreg_cc.center_of_masses()
    volumes = subreg_cc.volumes()
    stats = {i: (g[1], True, volumes[i]) for i, g in coms.items()}

    vert_coms = corpus_cc.center_of_masses()
    vert_volumes = corpus_cc.volumes()

    for i, g in vert_coms.items():
        stats[i] = (g[1], False, vert_volumes[i])

    stats_by_height = dict(sorted(stats.items(), key=lambda x: x[1][0]))
    stats_by_height_keys = list(stats_by_height.keys())

    for vl in stats_by_height_keys:
        idx = stats_by_height_keys.index(vl)
        statsvl = stats_by_height[vl]

        is_ivd = statsvl[1]
        selfheight = statsvl[0]

        nidx = idx + 1
        if nidx < 0 or nidx >= len(stats_by_height_keys):
            continue

        nkey = stats_by_height_keys[nidx]
        if nkey in stats_by_height and stats_by_height[nkey][1] == is_ivd and has_ivd:
            neighbor = stats_by_height[nkey]
            neighborheight = neighbor[0]
            logger.print(
                f"Wrong ivd-vert alternation found in label {vl}, is_ivd = {is_ivd}, neighbor {nkey}",
                Log_Type.STRANGE,
                verbose=verbose,
            )
            # check if same heigh, then just merge ivd label
            if abs(neighborheight - selfheight) < mm_to_voxels_axis(SAME_HEIGHT_MERGE_THRESHOLD_MM, seg_nii.zoom, INFERIOR_AXIS_PIR):
                logger.print("Same height, just merge")
                stats_by_height.pop(vl)
                stats_by_height = dict(sorted(stats_by_height.items(), key=lambda x: x[1][0]))
                stats_by_height_keys = list(stats_by_height.keys())
                continue

            logger.print("Merged corpi, try to fix it", verbose=verbose)
            neighbor_verts = {
                stats_by_height_keys[idx + i]: stats_by_height[stats_by_height_keys[idx + i]]
                for i in NEIGHBOR_OFFSETS
                if (idx + i) < len(stats_by_height_keys) and (idx + i) >= 0 and stats_by_height_keys[idx + i] < 99
            }

            logger.print("neighbor_vert_labels", neighbor_verts, verbose=verbose)
            if len(neighbor_verts) == 0:
                logger.print("Got no neighbor vert labels to fix", Log_Type.FAIL)
                continue
            neighbor_volumes = [k[2] for k in neighbor_verts.values()]
            logger.print("neighbor_volumes", neighbor_volumes, verbose=verbose)
            n_neighbors_without_target = len(neighbor_volumes) - 1
            if n_neighbors_without_target > MIN_NEIGHBORS_FOR_VOLUME_CHECK:
                argmax = np.argmax(neighbor_volumes)
                n_avg_volume = sum(neighbor_volumes[i] for i in range(len(neighbor_volumes)) if i != argmax) / n_neighbors_without_target

                diff_volume = neighbor_volumes[argmax] / n_avg_volume
                if diff_volume > MERGED_CORPUS_VOLUME_RATIO:
                    logger.print(
                        f"Volume difference detected in label {vl}, diff = {diff_volume}, volume = {neighbor_volumes[argmax]}, neighbor_avg = {n_avg_volume}",
                        Log_Type.STRANGE,
                    )

                    target_vert_id = list(neighbor_verts.keys())[argmax]
                    segvert = corpus_cc.extract_label(target_vert_id, inplace=False).get_array()
                    try:
                        logger.print("get_separating_components to split vertebra", verbose=verbose)
                        (spart, tpart, spart_dil, tpart_dil, _) = get_separating_components(segvert, connectivity=3)

                        logger.print("Splitting by plane")
                        plane_split_nii = get_plane_split(segvert, corpus_nii, spart, tpart, spart_dil, tpart_dil)
                        split_vert = split_by_plane(segvert, plane_split_nii)
                        corpus_cc[split_vert == 2] = corpus_cc.max() + 1
                    except Exception as e:
                        logger.print(f"Separating Corpi failed with exception {e}", Log_Type.FAIL)

    corpus_coms = list(corpus_cc.center_of_masses().values())
    corpus_coms.sort(key=lambda a: a[1])
    if dense:
        dens_cc = seg_nii.get_segmentation_connected_components_center_of_mass(label=Location.Dens_axis.value)
        corpus_coms.extend(dens_cc)
    corpus_coms.reverse()  # from bottom to top
    logger.print(f"Found {len(corpus_coms)} final Corpus ccs", verbose=verbose)
    return corpus_coms

get_separating_components

get_separating_components(
    segvert: ndarray,
    max_iter: int = 10,
    connectivity: int = 3,
) -> tuple[
    np.ndarray,
    np.ndarray,
    np.ndarray,
    np.ndarray,
    np.ndarray,
]

Split a binary volume into two spatially separate components (S and T) via erosion and dilation.

Designed for a segmentation that is a single connected component but should be separated into two meaningful subregions. Morphological erosion is applied until the volume breaks into multiple connected components (the splitting point), then the two regions are recovered through dilation. Useful for anatomical structures that are initially connected (e.g., two merged vertebral bodies).

Parameters:

Name Type Description Default
segvert ndarray

3D binary (or labeled) array representing the volume to split.

required
max_iter int

Maximum number of erosion iterations allowed to find separable components. Defaults to 10.

10
connectivity int

Connectivity for morphological operations (1=6-, 2=18-, 3=26-connectivity). Defaults to 3.

3

Returns:

Type Description
ndarray

tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: A 5-tuple ``(spart, tpart, spart_dil,

ndarray

tpart_dil, stpart)wherespart/tpartare the two separated binary components,spart_dil/tpart_dil``

ndarray

are their dilations grown until contact, and stpart is their combined map (0=background, 1=spart_dil only,

ndarray

2=tpart_dil only, 3=overlap of the two dilations).

Raises:

Type Description
Exception

If the volume cannot be split into two parts within the iterations, or if a resulting part is empty.

IndentationError

If the maximum number of erosion iterations is reached without successful separation.

Source code in spineps/phase_instance.py
def get_separating_components(
    segvert: np.ndarray, max_iter: int = 10, connectivity: int = 3
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Split a binary volume into two spatially separate components (S and T) via erosion and dilation.

    Designed for a segmentation that is a single connected component but should be separated into two
    meaningful subregions. Morphological erosion is applied until the volume breaks into multiple connected
    components (the splitting point), then the two regions are recovered through dilation. Useful for
    anatomical structures that are initially connected (e.g., two merged vertebral bodies).

    Args:
        segvert (np.ndarray): 3D binary (or labeled) array representing the volume to split.
        max_iter (int, optional): Maximum number of erosion iterations allowed to find separable components. Defaults to 10.
        connectivity (int, optional): Connectivity for morphological operations (1=6-, 2=18-, 3=26-connectivity). Defaults to 3.

    Returns:
        tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: A 5-tuple ``(spart, tpart, spart_dil,
        tpart_dil, stpart)`` where ``spart``/``tpart`` are the two separated binary components, ``spart_dil``/``tpart_dil``
        are their dilations grown until contact, and ``stpart`` is their combined map (0=background, 1=spart_dil only,
        2=tpart_dil only, 3=overlap of the two dilations).

    Raises:
        Exception: If the volume cannot be split into two parts within the iterations, or if a resulting part is empty.
        IndentationError: If the maximum number of erosion iterations is reached without successful separation.
    """
    check_connectivity = 3
    vol = segvert.copy()
    vol_old = vol.copy()
    iterations = 0
    while True:
        vol_erode = np_erode_msk(vol, n_pixel=1, connectivity=connectivity)
        subreg_cc, subreg_cc_n = np_connected_components(vol_erode, connectivity=check_connectivity)
        if subreg_cc_n > 1:
            vol = subreg_cc
            break
        elif subreg_cc_n == 0:  # np.max(subreg_cc)# 1 not in np_unique(subreg_cc)
            vol_dilated = np_dilate_msk(vol, n_pixel=1, connectivity=connectivity, mask=vol.copy())
            # use iteration before to get other CC
            vol[vol_old != 0] = 2  # all possible voxels are 2
            vol[vol_dilated == 1] = 1

            if 2 not in np_volume(vol):
                raise Exception(  # noqa: TRY002
                    f"cannot split volume into two parts after {iterations} iterations, all values are 0 after erosion."
                )
            volume = np_volume(vol)
            dil_iter = 0
            while volume[1] / (volume[1] + volume[2]) < 0.5:
                vol_dilated = np_dilate_msk(vol_dilated, n_pixel=1, connectivity=connectivity, mask=vol.copy())
                # inst_nii.set_array(vol_dilated).save(files_out + f"subreg_cc_vol_dilated{dil_iter}.nii.gz")
                vol[vol_dilated == 1] = 1
                # inst_nii.set_array(vol).save(files_out + f"subreg_cc_dilation{dil_iter}.nii.gz")
                volume = np_volume(vol)
                if 1 not in volume or 2 not in volume:
                    raise Exception("Could not divide into two instance")  # noqa: TRY002
                dil_iter += 1

            vol_1 = np_filter_connected_components(vol == 1, largest_k_components=1, connectivity=check_connectivity).astype(np.uint8)
            vol_2 = np_filter_connected_components(vol == 2, largest_k_components=1, connectivity=check_connectivity).astype(np.uint8)
            vol_2 *= 2
            vol[vol == 1] = vol_1[vol == 1]
            vol[vol == 2] = vol_2[vol == 2]
            break
        vol_old = vol
        vol = vol_erode
        iterations += 1
        if iterations > max_iter:
            raise IndentationError(f"Could not divide into two instance after max iterations {max_iter}")
    if len(np_volume(vol)) != 2:
        logger.print("Get largest two components")
        subreg_cc_2k = np_filter_connected_components(
            vol, largest_k_components=2, connectivity=check_connectivity, return_original_labels=False
        )
        spart = subreg_cc_2k == 1
        tpart = subreg_cc_2k == 2
    else:
        spart = vol == 1
        tpart = vol == 2

    if spart.sum() == 0 or tpart.sum() == 0:
        raise Exception("S or T are empty")  # noqa: TRY002

    spart_dil = np_dilate_msk(spart, n_pixel=1, connectivity=connectivity)
    tpart_dil = np_dilate_msk(tpart, n_pixel=1, connectivity=connectivity)
    stpart = (spart_dil + (tpart_dil * 2)).astype(np.uint8)
    while 3 not in np_volume(stpart):
        spart_dil = np_dilate_msk(spart_dil, n_pixel=1, connectivity=connectivity)
        stpart = (spart_dil + (tpart_dil * 2)).astype(np.uint8)
        if 3 in np_volume(stpart):
            break
        tpart_dil = np_dilate_msk(tpart_dil, n_pixel=1, connectivity=connectivity)
        stpart = (spart_dil + (tpart_dil * 2)).astype(np.uint8)
    stpart = spart_dil + (tpart_dil * 2)
    return spart, tpart, spart_dil, tpart_dil, stpart

get_plane_split

get_plane_split(
    segvert: ndarray,
    compare_nii: NII,
    spart: ndarray,
    tpart: ndarray,
    spart_dil: ndarray,
    tpart_dil: ndarray,
) -> NII

Compute an approximate separating plane between two regions and return it as an NII.

Determines the collision region between the dilated versions of the two components, takes its center of mass as a point on the plane, and builds a plane orthogonal to the vector between the centers of mass of spart and tpart. The plane is then filled along the superior axis and returned as an NII matching the input orientation. If the dilated masks do not touch, an empty volume is returned and a warning is logged.

Note

TODO: Improve accuracy by projecting the collision point onto the line connecting both COMs rather than relying on the COM of the overlap region.

Parameters:

Name Type Description Default
segvert ndarray

Original 3D binary or labeled segmentation volume.

required
compare_nii NII

Reference image providing orientation and spatial metadata for the output.

required
spart ndarray

Binary mask of the first component (S).

required
tpart ndarray

Binary mask of the second component (T).

required
spart_dil ndarray

Dilated mask of spart.

required
tpart_dil ndarray

Dilated mask of tpart.

required

Returns:

Name Type Description
NII NII

A filled binary plane separating spart and tpart, in the input orientation; an empty image

NII

if no collision between the dilated masks is detected.

Source code in spineps/phase_instance.py
def get_plane_split(
    segvert: np.ndarray,
    compare_nii: NII,
    spart: np.ndarray,
    tpart: np.ndarray,
    spart_dil: np.ndarray,
    tpart_dil: np.ndarray,
) -> NII:
    """Compute an approximate separating plane between two regions and return it as an NII.

    Determines the collision region between the dilated versions of the two components, takes its center of mass
    as a point on the plane, and builds a plane orthogonal to the vector between the centers of mass of ``spart``
    and ``tpart``. The plane is then filled along the superior axis and returned as an NII matching the input
    orientation. If the dilated masks do not touch, an empty volume is returned and a warning is logged.

    Note:
        TODO: Improve accuracy by projecting the collision point onto the line connecting both COMs rather than
        relying on the COM of the overlap region.

    Args:
        segvert (np.ndarray): Original 3D binary or labeled segmentation volume.
        compare_nii (NII): Reference image providing orientation and spatial metadata for the output.
        spart (np.ndarray): Binary mask of the first component (S).
        tpart (np.ndarray): Binary mask of the second component (T).
        spart_dil (np.ndarray): Dilated mask of ``spart``.
        tpart_dil (np.ndarray): Dilated mask of ``tpart``.

    Returns:
        NII: A filled binary plane separating ``spart`` and ``tpart``, in the input orientation; an empty image
        if no collision between the dilated masks is detected.
    """

    s_dilint = spart_dil.astype(np.uint8)
    t_dilint = tpart_dil.astype(np.uint8)
    collision_arr = s_dilint + t_dilint
    if 2 not in collision_arr:
        logger.print("S and T dilated do not touch each other error")
        return compare_nii.set_array(np.zeros_like(segvert))
    collision_arr[collision_arr != 2] = 0
    collision_arr[collision_arr == 2] = 1
    collision_point = np_center_of_mass(collision_arr)[1]
    # TODO instead of COM, calc COM of S and T, make vector and use merge point along that vector as collision point? should be more accurate
    #
    normal_vector = np_center_of_mass(spart.astype(np.uint8))[1] - np_center_of_mass(tpart.astype(np.uint8))[1]
    normalized_normal = normal_vector / np.linalg.norm(normal_vector)
    axis: int = np.argmax(np.abs(normalized_normal))  # type: ignore
    dims = [0, 1, 2]
    dims.remove(axis)
    dim1, dim2 = dims
    shift_total = -collision_point.dot(normal_vector)
    xx, yy = np.meshgrid(range(collision_arr.shape[dim1]), range(collision_arr.shape[dim2]))
    zz = (-normal_vector[dim1] * xx - normal_vector[dim2] * yy - shift_total) * 1.0 / normal_vector[axis]
    z_max = collision_arr.shape[axis] - 1
    zz[zz < 0] = 0
    zz[zz > z_max] = z_max
    # make cords to array again
    plane_coords = np.zeros([xx.shape[0], xx.shape[1], 3], dtype=int)
    plane_coords[:, :, axis] = zz
    plane_coords[:, :, dim1] = xx
    plane_coords[:, :, dim2] = yy

    plane = segvert * 0
    plane[plane_coords[:, :, 0], plane_coords[:, :, 1], plane_coords[:, :, 2]] = 1
    plane_nii = compare_nii.set_array(plane)

    plane_filled_nii = plane_nii.copy()
    orientation = plane_filled_nii.orientation
    plane_filled_nii.reorient_(("S", "A", "R"))
    plane_filled_arr = plane_filled_nii.get_array()
    x_slice = np.ones_like(plane_filled_arr[0]) * np.max(plane_filled_arr) + 1
    # plane_filled[:, 0, :] = 2
    # plane_filled[:, -1, :] = 1
    for i in range(plane_filled_arr.shape[0]):
        curr_slice = plane_filled_arr[i]
        cond = np.where(curr_slice != 0)
        x_slice[cond] = np.minimum(curr_slice[cond], x_slice[cond])
        plane_filled_arr[i] = x_slice

    plane_filled_nii.set_array_(plane_filled_arr).reorient_(orientation)
    return plane_filled_nii

split_by_plane

split_by_plane(
    segvert: ndarray, plane_filled_nii: NII
) -> np.ndarray

Split a vertebra volume into two parts using a filled separating plane.

Masks the filled plane (whose values are 1 on one side of the plane and 2 on the other) to the foreground of segvert, yielding a label map that partitions the vertebra into the two sides.

Parameters:

Name Type Description Default
segvert ndarray

Binary vertebra volume to split.

required
plane_filled_nii NII

Filled separating plane (1 above, 2 below) as produced by get_plane_split.

required

Returns:

Type Description
ndarray

np.ndarray: The vertebra voxels labeled 1 (above the plane) and 2 (below the plane); background stays 0.

Source code in spineps/phase_instance.py
def split_by_plane(
    segvert: np.ndarray,
    plane_filled_nii: NII,
) -> np.ndarray:
    """Split a vertebra volume into two parts using a filled separating plane.

    Masks the filled plane (whose values are 1 on one side of the plane and 2 on the other) to the foreground
    of ``segvert``, yielding a label map that partitions the vertebra into the two sides.

    Args:
        segvert (np.ndarray): Binary vertebra volume to split.
        plane_filled_nii (NII): Filled separating plane (1 above, 2 below) as produced by ``get_plane_split``.

    Returns:
        np.ndarray: The vertebra voxels labeled 1 (above the plane) and 2 (below the plane); background stays 0.
    """
    plane_filled_arr = plane_filled_nii.get_array()
    # 1 above, 2 below
    plane_filled_arr[segvert == 0] = 0
    segvert = plane_filled_arr
    # segvert[plane_filled_arr == 1] = 1
    # segvert[plane_filled_arr == 2] = 2
    return segvert

collect_vertebra_predictions

collect_vertebra_predictions(
    seg_nii: NII,
    model: Segmentation_Model,
    corpus_size_cleaning: int,
    cutout_size: tuple[int, int, int],
    debug_data: dict,
    proc_inst_largest_k_cc: int = 0,
    process_detect_and_solve_merged_corpi: bool = True,
    proc_inst_fill_holes: bool = False,
    verbose: bool = False,
) -> tuple[np.ndarray | None, list[str], int]

Run the instance model on a cutout around each corpus center of mass and collect per-label predictions.

Computes corpus centers of mass, and for each one extracts a cutout_size window (nudged inferiorly until it lands on segmentation), relabels it to the model's expected labels, runs the model, post-processes the cutout, and stores each predicted label (1/2/3, the three-vertebra hierarchy) as a binary map placed back into the full-volume frame.

Parameters:

Name Type Description Default
seg_nii NII

Subregion semantic mask used for the cutouts.

required
model Segmentation_Model

Instance model producing the three-vertebra-body cutout predictions.

required
corpus_size_cleaning int

Voxel threshold for cleaning small corpus artifacts when finding coms; 0 disables.

required
cutout_size tuple[int, int, int]

Cutout window size (per axis) extracted around each center of mass.

required
debug_data dict

Dictionary for collecting per-cutout intermediate results.

required
proc_inst_largest_k_cc int

Keep only the largest k connected components per cutout label; 0 disables. Defaults to 0.

0
process_detect_and_solve_merged_corpi bool

Whether to detect and split merged corpora. Defaults to True.

True
proc_inst_fill_holes bool

Whether to fill holes in each cutout prediction. Defaults to False.

False
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
ndarray | None

tuple[np.ndarray | None, list[str], int]: A hierarchical prediction array of shape

list[str]

(n_corpus_coms, 3, *seg_shape), a list of "comidx_label" identifiers for the predictions actually

int

produced, and the number of corpus centers of mass. Returns (None, [], 0) if no corpus is found.

Source code in spineps/phase_instance.py
def collect_vertebra_predictions(
    seg_nii: NII,
    model: Segmentation_Model,
    corpus_size_cleaning: int,
    cutout_size: tuple[int, int, int],
    debug_data: dict,
    proc_inst_largest_k_cc: int = 0,
    process_detect_and_solve_merged_corpi: bool = True,
    proc_inst_fill_holes: bool = False,
    verbose: bool = False,
) -> tuple[np.ndarray | None, list[str], int]:
    """Run the instance model on a cutout around each corpus center of mass and collect per-label predictions.

    Computes corpus centers of mass, and for each one extracts a ``cutout_size`` window (nudged inferiorly until
    it lands on segmentation), relabels it to the model's expected labels, runs the model, post-processes the
    cutout, and stores each predicted label (1/2/3, the three-vertebra hierarchy) as a binary map placed back
    into the full-volume frame.

    Args:
        seg_nii (NII): Subregion semantic mask used for the cutouts.
        model (Segmentation_Model): Instance model producing the three-vertebra-body cutout predictions.
        corpus_size_cleaning (int): Voxel threshold for cleaning small corpus artifacts when finding coms; 0 disables.
        cutout_size: Cutout window size (per axis) extracted around each center of mass.
        debug_data (dict): Dictionary for collecting per-cutout intermediate results.
        proc_inst_largest_k_cc (int, optional): Keep only the largest k connected components per cutout label; 0 disables. Defaults to 0.
        process_detect_and_solve_merged_corpi (bool, optional): Whether to detect and split merged corpora. Defaults to True.
        proc_inst_fill_holes (bool, optional): Whether to fill holes in each cutout prediction. Defaults to False.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[np.ndarray | None, list[str], int]: A hierarchical prediction array of shape
        ``(n_corpus_coms, 3, *seg_shape)``, a list of ``"comidx_label"`` identifiers for the predictions actually
        produced, and the number of corpus centers of mass. Returns ``(None, [], 0)`` if no corpus is found.
    """
    corpus_coms = get_corpus_coms(
        seg_nii,
        corpus_size_cleaning=corpus_size_cleaning,
        process_detect_and_solve_merged_corpi=process_detect_and_solve_merged_corpi,
        verbose=verbose,
    )
    if corpus_coms is None:
        return None, [], 0
    n_corpus_coms = len(corpus_coms)

    if n_corpus_coms < 3:
        logger.print(f"Too few vertebra semantically segmented ({n_corpus_coms}), might have bad result", Log_Type.WARNING)
        # return None, [], 0

    shp = (
        # n_corpus_coms,
        # 3
        seg_nii.shape[0],
        seg_nii.shape[1],
        seg_nii.shape[2],
    )
    hierarchical_existing_predictions = []
    # Holds only binary {0, 1} per-label masks, so uint8 is sufficient (the source dtype can be wider,
    # which would needlessly inflate this n_coms x 3 x volume array and slow the Dice comparisons below).
    hierarchical_predictions = np.zeros((n_corpus_coms, 3, *shp), dtype=np.uint8)
    # print("hierarchical_predictions", hierarchical_predictions.shape)
    vert_predict_template = np.zeros(shp, dtype=np.uint16)
    # print("vert_predict_template", vert_predict_template.shape)

    # relabel to the labels expected by the model
    # {41: 1, 42: 2, 43: 3, 44: 4, 45: 5, 46: 6, 47: 7, 48: 8, 49: 9, 50: 9, Location.Dens_axis.value: 9}
    mapping = {int(a): int(b) for a, b in model.inference_config.mapping.items()}
    seg_nii_for_cut: NII = seg_nii.copy().extract_label(list(mapping.keys()), keep_label=True).map_labels_(mapping, verbose=False)
    # print("seg_nii_for_cut", seg_nii_for_cut.shape)

    logger.print("Vertebra collect in", seg_nii.zoom, seg_nii.orientation, seg_nii.shape, verbose=verbose)

    # seg_nii_for_cut is constant across the loop; read its array once instead of copying it per centroid.
    seg_arr_c = seg_nii_for_cut.get_seg_array()
    # iterate over sorted coms and segment vertebra from subreg
    for com_idx, com in enumerate(tqdm(corpus_coms, desc=logger._get_logger_prefix() + " Vertebra Body predictions")):
        # Shift the com until there is a segmentation there (to account for mishaps in the com calculation)
        seg_at_com = seg_arr_c[int(com[0])][int(com[1])][int(com[2])] != 0
        orig_com = (com[0], com[1], com[2])
        while not seg_at_com:
            com = (com[0], com[1] + 5, com[2])  # noqa: PLW2901
            if com[1] >= shp[1]:
                logger.print("Collect Vertebra Predictions: One Cutout at weird position", Log_Type.FAIL)
                com = orig_com  # noqa: PLW2901
                break
            seg_at_com = seg_arr_c[int(com[0])][int(com[1])][int(com[2])] != 0

        # Calc cutout
        arr_cut, cutout_coords, paddings = np_calc_crop_around_centerpoint(com, seg_arr_c, cutout_size)
        cut_nii = seg_nii_for_cut.set_array(arr_cut, verbose=False).reorient_()
        debug_data[f"inst_cutout_vert_nii_{com_idx}_cut"] = cut_nii
        results = model.segment_scan(
            cut_nii,
            resample_to_recommended=False,
            pad_size=0,
            resample_output_to_input_space=False,
            verbose=False,
        )
        vert_cut_nii = results[OutputType.seg].reorient_()
        # print("vert_cut_nii", vert_cut_nii.shape)
        # logger.print(f"Done {com_idx}")
        debug_data[f"inst_cutout_vert_nii_{com_idx}_pred"] = vert_cut_nii.copy()
        vert_cut_nii = post_process_single_3vert_prediction(
            vert_cut_nii,
            None,
            fill_holes=proc_inst_fill_holes,
            largest_cc=proc_inst_largest_k_cc,  # type:ignore
        )
        vert_labels = vert_cut_nii.unique()  # 1,2,3
        debug_data[f"inst_cutout_vert_nii_{com_idx}_proc"] = vert_cut_nii.copy()

        cutout_sizes = tuple(cutout_coords[i].stop - cutout_coords[i].start for i in range(len(cutout_coords)))
        pad_cutout = tuple(slice(paddings[i][0], paddings[i][0] + cutout_sizes[i]) for i in range(len(paddings)))
        # print("cutout_sizes", cutout_sizes)
        # print("pad_cutout", pad_cutout)
        arr = vert_cut_nii.get_seg_array()
        vert_predict_map = vert_predict_template.copy()
        vert_predict_map[cutout_coords] = arr[pad_cutout]
        # vert_predict_map[com_idx][cutout_coords][vert_predict_map[com_idx][cutout_coords] == 0] = arr[pad_cutout][
        #    vert_predict_map[com_idx][cutout_coords] == 0
        # ]
        seg_at_com = vert_predict_map[int(com[0])][int(com[1])][int(com[2])]
        if seg_at_com == 0:
            logger.print("Zero at cutout center, mistake", Log_Type.WARNING)
        # if seg_at_com != 0:
        #    # before (1) is above
        #    shift = 2 - seg_at_com
        #    vert_predictions[com_idx][vert_predictions[com_idx] != 0] = vert_predictions[com_idx][vert_predictions[com_idx] != 0] + shift
        # debug_data[f"vert_nii_{com_idx}_proc2"] = seg_nii_for_cut.set_array(vert_predictions[com_idx][cutout_coords])
        for l in vert_labels:
            vert_l_map = vert_predict_map.copy()
            vert_l_map[vert_l_map != l] = 0
            vert_l_map[vert_l_map != 0] = 1
            labelindex = l - 1
            if vert_l_map.max() > 0:
                hierarchical_predictions[com_idx][labelindex] = vert_l_map
                hierarchical_existing_predictions.append(str_id_com_label(com_idx, labelindex))
    return hierarchical_predictions, hierarchical_existing_predictions, n_corpus_coms

post_process_single_3vert_prediction

post_process_single_3vert_prediction(
    vert_nii: NII,
    labels: list[int] | None = None,
    largest_cc: int = 0,
    fill_holes: bool = False,
) -> NII

Post-process a single three-vertebra cutout prediction by filtering components and filling holes.

Parameters:

Name Type Description Default
vert_nii NII

The cutout prediction mask to clean.

required
labels list[int] | None

Labels to restrict the connected-component filter to. Defaults to None (all labels).

None
largest_cc int

Keep only the largest largest_cc connected components per label; 0 disables. Defaults to 0.

0
fill_holes bool

Whether to fill holes in the prediction. Defaults to False.

False

Returns:

Name Type Description
NII NII

The post-processed cutout prediction mask.

Source code in spineps/phase_instance.py
def post_process_single_3vert_prediction(
    vert_nii: NII,
    labels: list[int] | None = None,
    largest_cc: int = 0,
    fill_holes: bool = False,
) -> NII:
    """Post-process a single three-vertebra cutout prediction by filtering components and filling holes.

    Args:
        vert_nii (NII): The cutout prediction mask to clean.
        labels (list[int] | None, optional): Labels to restrict the connected-component filter to. Defaults to None (all labels).
        largest_cc (int, optional): Keep only the largest ``largest_cc`` connected components per label; 0 disables. Defaults to 0.
        fill_holes (bool, optional): Whether to fill holes in the prediction. Defaults to False.

    Returns:
        NII: The post-processed cutout prediction mask.
    """
    if largest_cc > 0:  # 5 seems like a good number (if three, then at least center must be fully visible)
        vert_nii = vert_nii.filter_connected_components(max_count_component=largest_cc, labels=labels, keep_label=True)
    if fill_holes:
        labels = vert_nii.unique()  # type:ignore
        vert_nii.fill_holes_(labels=labels, verbose=False)
    return vert_nii

str_id_com_label

str_id_com_label(com_idx: int, label: int) -> str

Build the string identifier for a single (corpus-com, label) prediction.

Parameters:

Name Type Description Default
com_idx int

Index of the corpus center of mass.

required
label int

Label index within that center's three-vertebra prediction.

required

Returns:

Name Type Description
str str

The identifier "{com_idx}_{label}".

Source code in spineps/phase_instance.py
def str_id_com_label(com_idx: int, label: int) -> str:
    """Build the string identifier for a single (corpus-com, label) prediction.

    Args:
        com_idx (int): Index of the corpus center of mass.
        label (int): Label index within that center's three-vertebra prediction.

    Returns:
        str: The identifier ``"{com_idx}_{label}"``.
    """
    return str(com_idx) + "_" + str(label)

from_vert3_predictions_make_vert_mask

from_vert3_predictions_make_vert_mask(
    seg_nii: NII,
    vert_predictions: ndarray,
    hierarchical_existing_predictions: list[str],
    vert_size_threshold: int,
    debug_data: dict,
    proc_inst_clean_small_cc_artifacts: bool = True,
    verbose: bool = False,
) -> tuple[NII, dict, ErrCode]

Merge the hierarchical three-vertebra predictions into a single vertebra instance mask.

Each per-label prediction looks among neighboring predictions (center index -2 to +2, all three labels) for its most-agreeing partners (by Dice), forming prediction couples. The couples are then merged into one instance label map, optionally cleaning small connected-component artifacts.

Parameters:

Name Type Description Default
seg_nii NII

Reference segmentation providing shape and spatial metadata.

required
vert_predictions ndarray

Hierarchical predictions of shape (com_idx, label, *shape).

required
hierarchical_existing_predictions list[str]

Identifiers of the predictions that were actually produced.

required
vert_size_threshold int

Voxel threshold for removing small instance artifacts.

required
debug_data dict

Dictionary for collecting intermediate results.

required
proc_inst_clean_small_cc_artifacts bool

Whether to delete small instance artifacts. Defaults to True.

True
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
tuple[NII, dict, ErrCode]

tuple[NII, dict, ErrCode]: The merged vertebra instance mask, the (updated) debug data, and an error code.

Source code in spineps/phase_instance.py
def from_vert3_predictions_make_vert_mask(
    seg_nii: NII,
    vert_predictions: np.ndarray,  # already hierarchical [com_idx, l, map]
    hierarchical_existing_predictions: list[str],  # list of actually used vert predictions
    vert_size_threshold: int,
    debug_data: dict,
    proc_inst_clean_small_cc_artifacts: bool = True,
    verbose: bool = False,
) -> tuple[NII, dict, ErrCode]:
    """Merge the hierarchical three-vertebra predictions into a single vertebra instance mask.

    Each per-label prediction looks among neighboring predictions (center index -2 to +2, all three labels) for
    its most-agreeing partners (by Dice), forming prediction couples. The couples are then merged into one
    instance label map, optionally cleaning small connected-component artifacts.

    Args:
        seg_nii (NII): Reference segmentation providing shape and spatial metadata.
        vert_predictions (np.ndarray): Hierarchical predictions of shape ``(com_idx, label, *shape)``.
        hierarchical_existing_predictions (list[str]): Identifiers of the predictions that were actually produced.
        vert_size_threshold (int): Voxel threshold for removing small instance artifacts.
        debug_data (dict): Dictionary for collecting intermediate results.
        proc_inst_clean_small_cc_artifacts (bool, optional): Whether to delete small instance artifacts. Defaults to True.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[NII, dict, ErrCode]: The merged vertebra instance mask, the (updated) debug data, and an error code.
    """
    # instance approach: each 1/2/3 pred finds it most agreeing partner in surrounding predictions (com idx -2 to +2 all three pred)
    # Then sort by agreement, and segment each (this would be able to add more vertebra than input coms if one is skipped)
    # each one that has been used for fixing a segmentation cannot be used again (so object loose their partners if too weird)

    # idx is always in the order in predictions (so bottom2up corpus CC)
    # arcus_coms sorted bottom to top
    hierarchical_predictions = vert_predictions
    # search space: all neighboring predictions
    # all search for up to two other predictions with best agreement
    coupled_predictions = create_prediction_couples(hierarchical_predictions, hierarchical_existing_predictions)

    logger.print("Coupled predictions", coupled_predictions, verbose=verbose)
    return merge_coupled_predictions(
        seg_nii,
        coupled_predictions=coupled_predictions,
        hierarchical_predictions=hierarchical_predictions,
        debug_data=debug_data,
        proc_clean_small_cc_artifacts=proc_inst_clean_small_cc_artifacts,
        vert_size_threshold=vert_size_threshold,
        verbose=verbose,
    )

create_prediction_couples

create_prediction_couples(
    hierarchical_predictions: ndarray,
    hierarchical_existing_predictions,
    verbose: bool = False,
) -> dict

Form and rank prediction couples across all hierarchical predictions.

For every (center index, label) prediction, finds its best-agreeing partners and groups them into a couple, averaging the agreement scores of duplicate couples. The result is sorted so that larger, higher-agreement couples come first (key = (len(couple) + 1) * mean_agreement).

Parameters:

Name Type Description Default
hierarchical_predictions ndarray

Hierarchical predictions of shape (com_idx, label, *shape).

required
hierarchical_existing_predictions list[str]

Identifiers of the predictions that were actually produced.

required
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Name Type Description
dict dict

Mapping from each couple (a tuple of (com_idx, label) members) to its mean agreement score,

dict

ordered by descending size-weighted agreement.

Source code in spineps/phase_instance.py
def create_prediction_couples(
    hierarchical_predictions: np.ndarray,
    hierarchical_existing_predictions,
    verbose: bool = False,
) -> dict:
    """Form and rank prediction couples across all hierarchical predictions.

    For every (center index, label) prediction, finds its best-agreeing partners and groups them into a couple,
    averaging the agreement scores of duplicate couples. The result is sorted so that larger, higher-agreement
    couples come first (key = ``(len(couple) + 1) * mean_agreement``).

    Args:
        hierarchical_predictions (np.ndarray): Hierarchical predictions of shape ``(com_idx, label, *shape)``.
        hierarchical_existing_predictions (list[str]): Identifiers of the predictions that were actually produced.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        dict: Mapping from each couple (a tuple of ``(com_idx, label)`` members) to its mean agreement score,
        ordered by descending size-weighted agreement.
    """
    n_predictions = hierarchical_predictions.shape[0]
    # Set for O(1) membership in the inner candidate search (called 3 * n_predictions times).
    existing_predictions = set(hierarchical_existing_predictions)

    coupled_predictions = {}
    # TODO try to calculate list of candidates here, take the predictions and then parallelize the find_prediction_couple

    for idx in range(n_predictions):
        for pred in range(3):
            couple, agreement = find_prediction_couple(idx, pred, hierarchical_predictions, existing_predictions, n_predictions, verbose)
            if couple is None:
                continue
            if couple not in coupled_predictions:
                coupled_predictions[couple] = [agreement]
            else:
                coupled_predictions[couple].append(agreement)
    coupled_predictions = {i: sum(v) / len(v) for i, v in coupled_predictions.items()}
    coupled_predictions = dict(
        sorted(
            coupled_predictions.items(),
            key=lambda item: (len(item[0]) + 1) * item[1],
            reverse=True,
        )
    )
    return coupled_predictions

parallel_dice

parallel_dice(
    anchor, pred, cand_loc: tuple
) -> tuple[float, tuple]

Compute the Dice score between two masks, tagged with a candidate location.

Parameters:

Name Type Description Default
anchor ndarray

Anchor prediction mask.

required
pred ndarray

Candidate prediction mask to compare against.

required
cand_loc tuple

Candidate location identifier carried through unchanged.

required

Returns:

Type Description
tuple[float, tuple]

tuple[float, Any]: The Dice score between anchor and pred and the passed-through cand_loc.

Source code in spineps/phase_instance.py
def parallel_dice(anchor, pred, cand_loc: tuple) -> tuple[float, tuple]:
    """Compute the Dice score between two masks, tagged with a candidate location.

    Args:
        anchor (np.ndarray): Anchor prediction mask.
        pred (np.ndarray): Candidate prediction mask to compare against.
        cand_loc: Candidate location identifier carried through unchanged.

    Returns:
        tuple[float, Any]: The Dice score between ``anchor`` and ``pred`` and the passed-through ``cand_loc``.
    """
    return float(np_dice(anchor, pred)), cand_loc

find_prediction_couple

find_prediction_couple(
    idx,
    pred,
    hierarchical_predictions: ndarray,
    hierarchical_existing_predictions,
    n_predictions,
    verbose: bool = False,
) -> tuple[tuple | None, float]

Find the best-agreeing partner predictions for one anchor prediction.

Considers candidate predictions within +/-2 of the anchor's center index (all three labels, excluding the anchor itself), ranks them by Dice with the anchor, and keeps up to the two best whose Dice exceeds 0.3. The anchor itself is appended, and the members are returned sorted by center index.

Parameters:

Name Type Description Default
idx int

Center-of-mass index of the anchor prediction.

required
pred int

Label index of the anchor prediction.

required
hierarchical_predictions ndarray

Hierarchical predictions of shape (com_idx, label, *shape).

required
hierarchical_existing_predictions list[str]

Identifiers of the predictions that were actually produced.

required
n_predictions int

Total number of corpus centers of mass.

required
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
tuple | None

tuple[tuple | None, float]: The couple (a sorted tuple of (com_idx, label) members including the

float

anchor) and its mean partner agreement. Returns (None, 0) if the anchor prediction does not exist.

Source code in spineps/phase_instance.py
def find_prediction_couple(
    idx,
    pred,
    hierarchical_predictions: np.ndarray,
    hierarchical_existing_predictions,
    n_predictions,
    verbose: bool = False,
) -> tuple[tuple | None, float]:
    """Find the best-agreeing partner predictions for one anchor prediction.

    Considers candidate predictions within +/-2 of the anchor's center index (all three labels, excluding the
    anchor itself), ranks them by Dice with the anchor, and keeps up to the two best whose Dice exceeds 0.3.
    The anchor itself is appended, and the members are returned sorted by center index.

    Args:
        idx (int): Center-of-mass index of the anchor prediction.
        pred (int): Label index of the anchor prediction.
        hierarchical_predictions (np.ndarray): Hierarchical predictions of shape ``(com_idx, label, *shape)``.
        hierarchical_existing_predictions (list[str]): Identifiers of the predictions that were actually produced.
        n_predictions (int): Total number of corpus centers of mass.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[tuple | None, float]: The couple (a sorted tuple of ``(com_idx, label)`` members including the
        anchor) and its mean partner agreement. Returns ``(None, 0)`` if the anchor prediction does not exist.
    """
    if str_id_com_label(idx, pred) not in hierarchical_existing_predictions:
        logger.print(f"{str_id_com_label(idx, pred)} not in predictions {hierarchical_existing_predictions}", verbose=verbose)
        return None, 0
    anchor = hierarchical_predictions[idx][pred]
    dices = {}

    min_idx = max(0, idx - 2)
    max_idx = min(idx + 2, n_predictions)
    list_of_candidates = [
        (i, l)
        for i in range(min_idx, max_idx + 1)
        for l in [0, 1, 2]
        if i != idx and str_id_com_label(i, l) in hierarchical_existing_predictions
    ]

    # list_of_candidates = np.array(np.meshgrid(idx_candidates, [0, 1, 2])).T.reshape(-1, 2)
    for cand_loc in list_of_candidates:
        dices[tuple(cand_loc)] = float(np_dice(anchor, hierarchical_predictions[cand_loc[0]][cand_loc[1]]))

    # find k best partners
    dices = dict(sorted(dices.items(), key=lambda item: item[1], reverse=True))
    best_k = list(dices.keys())
    best_k = best_k[:2]

    couple = []
    dice_threshold = 0.3
    if len(best_k) > 0 and dices[best_k[0]] > dice_threshold:
        couple.append(best_k[0])
    if len(best_k) > 1 and dices[best_k[1]] > dice_threshold:
        couple.append(best_k[1])
    # if dices[best_k[2]] > dice_threshold:
    #    couple.append(best_k[2])
    if len(couple) == 2:
        # sort out if the other two do not overlap over threshold
        dice_partners = float(
            np_dice(
                hierarchical_predictions[best_k[0][0]][best_k[0][1]],
                hierarchical_predictions[best_k[1][0]][best_k[1][1]],
            )
        )
        if dice_partners < dice_threshold:
            logger.print(couple, " was skipped because the partners do not overlap", verbose=verbose)

    agreement = 0
    if len(couple) > 0:
        # print(couple)
        agreement = 0
        for c in couple:
            agreement += dices[c]
        agreement /= len(couple)
    couple.append((idx, pred))  # add anchor prediction into couple
    couple = tuple(sorted(couple, key=lambda x: x[0]))
    return couple, agreement

merge_coupled_predictions

merge_coupled_predictions(
    seg_nii: NII,
    coupled_predictions,
    hierarchical_predictions: ndarray,
    debug_data: dict,
    proc_clean_small_cc_artifacts: bool = True,
    vert_size_threshold: int = 0,
    verbose: bool = False,
) -> tuple[NII, dict, ErrCode]

Assemble the final vertebra instance mask from ranked prediction couples.

Iterates over the couples in priority order, summing their member maps and thresholding by voxel agreement (requiring overlap from at least two members unless the couple is small or low-agreement). Each accepted couple is written as a new instance label into voxels not yet claimed; couples overlapping established vertebrae by more than 60% are skipped. Small connected-component artifacts are optionally cleaned afterwards.

Parameters:

Name Type Description Default
seg_nii NII

Reference segmentation providing shape and spatial metadata.

required
coupled_predictions dict

Mapping from couple to mean agreement, ordered by priority.

required
hierarchical_predictions ndarray

Hierarchical predictions of shape (com_idx, label, *shape).

required
debug_data dict

Dictionary for collecting intermediate results.

required
proc_clean_small_cc_artifacts bool

Whether to delete small instance artifacts. Defaults to True.

True
vert_size_threshold int

Voxel threshold for removing small instance artifacts. Defaults to 0.

0
verbose bool

Emit additional progress logging. Defaults to False.

False

Returns:

Type Description
NII

tuple[NII, dict, ErrCode]: The vertebra instance mask, the (updated) debug data, and an error code

dict

(ErrCode.OK on success, ErrCode.EMPTY if a couple produces an empty mask or the result is empty).

Source code in spineps/phase_instance.py
def merge_coupled_predictions(
    seg_nii: NII,
    coupled_predictions,
    hierarchical_predictions: np.ndarray,
    debug_data: dict,
    proc_clean_small_cc_artifacts: bool = True,
    vert_size_threshold: int = 0,
    verbose: bool = False,
) -> tuple[NII, dict, ErrCode]:
    """Assemble the final vertebra instance mask from ranked prediction couples.

    Iterates over the couples in priority order, summing their member maps and thresholding by voxel agreement
    (requiring overlap from at least two members unless the couple is small or low-agreement). Each accepted
    couple is written as a new instance label into voxels not yet claimed; couples overlapping established
    vertebrae by more than 60% are skipped. Small connected-component artifacts are optionally cleaned afterwards.

    Args:
        seg_nii (NII): Reference segmentation providing shape and spatial metadata.
        coupled_predictions (dict): Mapping from couple to mean agreement, ordered by priority.
        hierarchical_predictions (np.ndarray): Hierarchical predictions of shape ``(com_idx, label, *shape)``.
        debug_data (dict): Dictionary for collecting intermediate results.
        proc_clean_small_cc_artifacts (bool, optional): Whether to delete small instance artifacts. Defaults to True.
        vert_size_threshold (int, optional): Voxel threshold for removing small instance artifacts. Defaults to 0.
        verbose (bool, optional): Emit additional progress logging. Defaults to False.

    Returns:
        tuple[NII, dict, ErrCode]: The vertebra instance mask, the (updated) debug data, and an error code
        (``ErrCode.OK`` on success, ``ErrCode.EMPTY`` if a couple produces an empty mask or the result is empty).
    """
    whole_vert_nii = seg_nii.copy()
    whole_vert_arr = np.zeros(whole_vert_nii.shape, dtype=np.uint16)  # this is fixed segmentations from vert

    idx = 1
    for k, overall_agreement in coupled_predictions.items():
        # take_no_overlap = len(k) <= 2 or overall_agreement < 0.45
        take_no_overlap = len(k) <= 2
        if overall_agreement < 0.3 + 0.15 * (4 - len(k)):
            take_no_overlap = True
        combine = np.zeros(whole_vert_nii.shape, dtype=whole_vert_nii.dtype)
        for cid in k:
            combine += hierarchical_predictions[cid[0]][cid[1]]
        # print(combine.shape)
        m = 1 if take_no_overlap else 2
        # m = min(max(1, np.max(combine)), 2)  # type:ignore
        combine[combine < m] = 0
        combine[combine != 0] = idx

        count_new = np_count_nonzero(combine)
        if count_new == 0:
            logger.print("ZERO instance mask failure on vertebra instance creation", Log_Type.FAIL)
            return seg_nii, debug_data, ErrCode.EMPTY
        fixed_n = combine.copy()
        fixed_n[whole_vert_arr != 0] = 0
        count_cut = np_count_nonzero(fixed_n)
        relative_overlap = (count_new - count_cut) / count_new
        if relative_overlap > 0.6:
            logger.print(k, f" was skipped because it overlaps {round(relative_overlap, 4)} with established verts", verbose=verbose)
            continue
        whole_vert_arr[whole_vert_arr == 0] = combine[whole_vert_arr == 0]
        idx += 1

    debug_data["inst_crop_vert_arr_a_raw"] = seg_nii.set_array(whole_vert_arr)

    if np_is_empty(whole_vert_arr):
        logger.print("Vert mask empty, will skip", Log_Type.FAIL)
        return whole_vert_nii.set_array_(whole_vert_arr, verbose=False), debug_data, ErrCode.EMPTY

    # Cleanup step
    if proc_clean_small_cc_artifacts:
        whole_vert_arr = clean_cc_artifacts(
            whole_vert_arr,
            labels=np_unique(whole_vert_arr)[1:],  # type:ignore
            cc_size_threshold=vert_size_threshold,
            only_delete=True,
            logger=logger,
            verbose=verbose,
        )
    # print("whole_vert_arr", whole_vert_arr.shape)
    # print("seg_nii", seg_nii.shape)
    whole_vert_nii_proc = seg_nii.set_array(whole_vert_arr)
    # print("whole_vert_arr_proc", whole_vert_arr_proc.shape)
    # debug_data["whole_vert_arr_proc"] = seg_nii.set_array(whole_vert_arr)
    # return seg_nii.set_array(whole_vert_arr, verbose=False).map_labels_(com_map, verbose=False), debug_data, ErrCode.OK
    return whole_vert_nii_proc, debug_data, ErrCode.OK

spineps.phase_labeling

spineps.phase_labeling

Vertebra-labeling phase: turns top-to-bottom vertebra instances into anatomical vertebra labels via a classifier and path search.

perform_labeling_step

perform_labeling_step(
    model: VertLabelingClassifier,
    img_nii: NII,
    vert_nii: NII,
    subreg_nii: NII | None = None,
    proc_lab_force_no_tl_anomaly: bool = False,
    disable_c1: bool = True,
) -> NII

Assign anatomical vertebra labels to a vertebra instance mask using the labeling classifier.

Runs the labeling classifier on each vertebra instance, derives a globally consistent label sequence, and relabels the instance mask accordingly. If a subregion mask is given, the classifier only sees the vertebra corpus (not the whole vertebra). Optionally adds a missing C1 label and zeroes out any instances that could not be matched.

Parameters:

Name Type Description Default
model VertLabelingClassifier

Classifier used to predict per-instance vertebra labels.

required
img_nii NII

Input MRI image.

required
vert_nii NII

Vertebra instance segmentation mask to be relabeled.

required
subreg_nii NII | None

Subregion semantic mask; if given, vertebrae are masked to their corpus before classification.

None
proc_lab_force_no_tl_anomaly bool

If True, disallow thoracolumbar (T13) transitional-vertebra anomalies.

False
disable_c1 bool

If True, do not predict/add a C1 label.

True

Returns:

Name Type Description
NII NII

The vertebra instance mask relabeled with anatomical vertebra labels (unmatched instances set to 0).

Source code in spineps/phase_labeling.py
def perform_labeling_step(
    model: VertLabelingClassifier,
    img_nii: NII,
    vert_nii: NII,
    subreg_nii: NII | None = None,
    proc_lab_force_no_tl_anomaly: bool = False,
    disable_c1: bool = True,
) -> NII:
    """Assign anatomical vertebra labels to a vertebra instance mask using the labeling classifier.

    Runs the labeling classifier on each vertebra instance, derives a globally consistent label sequence, and relabels the
    instance mask accordingly. If a subregion mask is given, the classifier only sees the vertebra corpus (not the whole vertebra).
    Optionally adds a missing C1 label and zeroes out any instances that could not be matched.

    Args:
        model (VertLabelingClassifier): Classifier used to predict per-instance vertebra labels.
        img_nii (NII): Input MRI image.
        vert_nii (NII): Vertebra instance segmentation mask to be relabeled.
        subreg_nii (NII | None): Subregion semantic mask; if given, vertebrae are masked to their corpus before classification.
        proc_lab_force_no_tl_anomaly (bool): If True, disallow thoracolumbar (T13) transitional-vertebra anomalies.
        disable_c1 (bool): If True, do not predict/add a C1 label.

    Returns:
        NII: The vertebra instance mask relabeled with anatomical vertebra labels (unmatched instances set to 0).
    """
    if model.predictor is None:
        model.load()

    if subreg_nii is not None:
        # crop for corpus instead of whole vertebra
        corpus_nii = subreg_nii.extract_label((Location.Vertebra_Corpus, Location.Vertebra_Corpus_border))
        vert_nii_c = vert_nii * corpus_nii
    else:
        vert_nii_c = vert_nii
    # run model
    labelmap = run_model_for_vert_labeling(
        model,
        img_nii,
        vert_nii_c,
        proc_lab_force_no_tl_anomaly=proc_lab_force_no_tl_anomaly,
        disable_c1=disable_c1,
    )[0]

    vert_nii_u = vert_nii.unique()
    # add C1 if it not set in labelmap, C2 exists, the minimal label (from sorting C1) is not mached anywhere.
    if not disable_c1 and min(vert_nii_u) not in labelmap and 1 not in labelmap.values() and 2 in labelmap.values():
        logger.on_debug("Add C1 after labeling")
        labelmap[min(vert_nii_u)] = 1
    # remove unmatched vertebras
    for i in vert_nii_u:
        if i not in labelmap:
            logger.on_debug(f"Set label to 0, because not found {i}")

            labelmap[i] = 0

    # relabel according to labelmap
    return vert_nii.map_labels_(labelmap)

run_model_for_vert_labeling

run_model_for_vert_labeling(
    model: VertLabelingClassifier,
    img_nii: NII,
    vert_nii: NII,
    verbose: bool = False,
    proc_lab_force_no_tl_anomaly: bool = False,
    disable_c1: bool = True,
) -> tuple[
    dict[int, int],
    float,
    list[int],
    list[int],
    list,
    list,
    dict,
]

Run the labeling classifier over a whole image/instance pair and resolve a vertebra label sequence.

Reorients, crops around the vertebrae, rescales to the model's recommended zoom, runs the classifier on every vertebra instance, and uses the cheapest-cost path search to turn per-instance predictions into a consistent anatomical sequence.

Parameters:

Name Type Description Default
model VertLabelingClassifier

Classifier used to predict per-instance vertebra labels.

required
img_nii NII

Input MRI image.

required
vert_nii NII

Vertebra instance segmentation mask.

required
verbose bool

If True, print intermediate weighting/path information.

False
proc_lab_force_no_tl_anomaly bool

If True, disallow thoracolumbar (T13) transitional-vertebra anomalies.

False
disable_c1 bool

If True, do not predict a C1 label.

True

Returns:

Name Type Description
tuple tuple[dict[int, int], float, list[int], list[int], list, list, dict]

(labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions) where labelmap maps each original instance label to its assigned vertebra label, fcost is the total path cost, fpath/fpath_post are the raw and post-processed label sequences, costlist is the cost matrix as a list, min_costs_path is the per-step minimum cost path, and predictions are the raw classifier outputs.

Raises:

Type Description
AssertionError

If the number of original instances does not match the resolved path length.

Source code in spineps/phase_labeling.py
def run_model_for_vert_labeling(
    model: VertLabelingClassifier,
    img_nii: NII,
    vert_nii: NII,
    verbose: bool = False,
    proc_lab_force_no_tl_anomaly: bool = False,
    disable_c1: bool = True,
) -> tuple[dict[int, int], float, list[int], list[int], list, list, dict]:
    """Run the labeling classifier over a whole image/instance pair and resolve a vertebra label sequence.

    Reorients, crops around the vertebrae, rescales to the model's recommended zoom, runs the classifier on every vertebra
    instance, and uses the cheapest-cost path search to turn per-instance predictions into a consistent anatomical sequence.

    Args:
        model (VertLabelingClassifier): Classifier used to predict per-instance vertebra labels.
        img_nii (NII): Input MRI image.
        vert_nii (NII): Vertebra instance segmentation mask.
        verbose (bool): If True, print intermediate weighting/path information.
        proc_lab_force_no_tl_anomaly (bool): If True, disallow thoracolumbar (T13) transitional-vertebra anomalies.
        disable_c1 (bool): If True, do not predict a C1 label.

    Returns:
        tuple: ``(labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions)`` where ``labelmap`` maps each
            original instance label to its assigned vertebra label, ``fcost`` is the total path cost, ``fpath``/``fpath_post``
            are the raw and post-processed label sequences, ``costlist`` is the cost matrix as a list, ``min_costs_path`` is the
            per-step minimum cost path, and ``predictions`` are the raw classifier outputs.

    Raises:
        AssertionError: If the number of original instances does not match the resolved path length.
    """
    # reorient
    img = img_nii.reorient(model.inference_config.model_expected_orientation, verbose=False)
    vert = vert_nii.reorient(model.inference_config.model_expected_orientation, verbose=False)
    zms_pir = img.zoom

    # crop
    crop = vert.compute_crop(dist=LABELING_CROP_MARGIN_MM / min(img.zoom))
    img.apply_crop_(crop)
    vert.apply_crop_(crop)

    # rescale
    img.rescale_(model.calc_recommended_resampling_zoom(zms_pir), verbose=False)
    vert.rescale_(model.calc_recommended_resampling_zoom(zms_pir), verbose=False)
    #
    img.assert_affine(other=vert)
    # extract vertebrae
    vert.extract_label_([i for i in range(1, 29) if i not in [26, 27]], keep_label=True)
    # counted label
    orig_label = vert.unique()
    # run model
    predictions = model.run_all_seg_instances(img, vert)

    fcost, fpath, fpath_post, costlist, min_costs_path, _args = find_vert_path_from_predictions(
        predictions=predictions,
        proc_lab_force_no_tl_anomaly=proc_lab_force_no_tl_anomaly,
        verbose=verbose,
        disable_c1=disable_c1,
    )
    assert len(orig_label) == len(fpath_post), f"{len(orig_label)} != {len(fpath_post)}"
    labelmap = {orig_label[idx]: fpath_post[idx] for idx in range(len(orig_label))}

    return labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions

run_model_for_vert_labeling_cutouts

run_model_for_vert_labeling_cutouts(
    model: VertLabelingClassifier,
    img_arrays: dict[int, ndarray],
    disable_c1: bool = True,
    boost_c2: float = 3.0,
    allow_cervical_skip: bool = True,
    verbose: bool = True,
) -> tuple[
    dict[int, int],
    float,
    list[int],
    list[int],
    list,
    list,
    dict,
]

Run the labeling classifier on precomputed per-instance image cutouts and resolve a vertebra label sequence.

Like :func:run_model_for_vert_labeling, but skips reorienting/cropping/rescaling and instead consumes already-prepared image arrays keyed by instance label.

Parameters:

Name Type Description Default
model VertLabelingClassifier

Classifier used to predict per-instance vertebra labels.

required
img_arrays dict[int, ndarray]

Mapping of vertebra instance label to its cropped image array.

required
disable_c1 bool

If True, do not predict a C1 label.

True
boost_c2 float

Multiplicative boost applied to a prediction whose argmax is C2.

3.0
allow_cervical_skip bool

If True, allow the path search to skip a class within the cervical region.

True
verbose bool

If True, print intermediate weighting/path information.

True

Returns:

Name Type Description
tuple tuple[dict[int, int], float, list[int], list[int], list, list, dict]

(labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions) (see :func:run_model_for_vert_labeling).

Raises:

Type Description
AssertionError

If the number of input arrays does not match the resolved path length.

Source code in spineps/phase_labeling.py
def run_model_for_vert_labeling_cutouts(
    model: VertLabelingClassifier,
    img_arrays: dict[int, np.ndarray],
    disable_c1: bool = True,
    boost_c2: float = 3.0,
    allow_cervical_skip: bool = True,
    verbose: bool = True,
) -> tuple[dict[int, int], float, list[int], list[int], list, list, dict]:
    """Run the labeling classifier on precomputed per-instance image cutouts and resolve a vertebra label sequence.

    Like :func:`run_model_for_vert_labeling`, but skips reorienting/cropping/rescaling and instead consumes already-prepared
    image arrays keyed by instance label.

    Args:
        model (VertLabelingClassifier): Classifier used to predict per-instance vertebra labels.
        img_arrays (dict[int, np.ndarray]): Mapping of vertebra instance label to its cropped image array.
        disable_c1 (bool): If True, do not predict a C1 label.
        boost_c2 (float): Multiplicative boost applied to a prediction whose argmax is C2.
        allow_cervical_skip (bool): If True, allow the path search to skip a class within the cervical region.
        verbose (bool): If True, print intermediate weighting/path information.

    Returns:
        tuple: ``(labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions)`` (see
            :func:`run_model_for_vert_labeling`).

    Raises:
        AssertionError: If the number of input arrays does not match the resolved path length.
    """
    # reorient
    # img = img_nii.reorient(model.inference_config.model_expected_orientation, verbose=False)
    # vert = vert_nii.reorient(model.inference_config.model_expected_orientation, verbose=False)
    # zms_pir = img.zoom
    # rescale
    # img.rescale_(model.calc_recommended_resampling_zoom(zms_pir), verbose=False)
    # vert.rescale_(model.calc_recommended_resampling_zoom(zms_pir), verbose=False)
    #
    # img.assert_affine(other=vert)
    # extract vertebrae
    # vert.extract_label_(list(range(1, 26)), keep_label=True)
    # counted label
    orig_label = list(img_arrays.keys())
    # run model
    predictions = model.run_all_arrays(img_arrays)
    fcost, fpath, fpath_post, costlist, min_costs_path, _args = find_vert_path_from_predictions(
        predictions=predictions,
        verbose=verbose,
        disable_c1=disable_c1,
        boost_c2=boost_c2,
        allow_cervical_skip=allow_cervical_skip,
    )
    assert len(orig_label) == len(fpath_post), f"{len(orig_label)} != {len(fpath_post)}"
    labelmap = {orig_label[idx]: fpath_post[idx] for idx in range(len(orig_label))}

    return labelmap, fcost, fpath, fpath_post, costlist, min_costs_path, predictions

region_to_vert

region_to_vert(
    region_softmax_values: ndarray,
) -> np.ndarray

Broadcast a 3-region (cervical, thoracic, lumbar) softmax into a per-vertebra-class vector.

Parameters:

Name Type Description Default
region_softmax_values ndarray

Length-3 region softmax values ordered cervical, thoracic, lumbar.

required

Returns:

Type Description
ndarray

np.ndarray: Length-VERT_CLASSES vector with each region's value broadcast across that region's vertebra classes.

Source code in spineps/phase_labeling.py
def region_to_vert(region_softmax_values: np.ndarray) -> np.ndarray:  # shape(1,3)
    """Broadcast a 3-region (cervical, thoracic, lumbar) softmax into a per-vertebra-class vector.

    Args:
        region_softmax_values (np.ndarray): Length-3 region softmax values ordered cervical, thoracic, lumbar.

    Returns:
        np.ndarray: Length-``VERT_CLASSES`` vector with each region's value broadcast across that region's vertebra classes.
    """
    vert_prediction_values = np.zeros(VERT_CLASSES)
    vert_prediction_values[CERV] = region_softmax_values[0]
    vert_prediction_values[THOR] = region_softmax_values[1]
    vert_prediction_values[LUMB] = region_softmax_values[2]
    return vert_prediction_values

prepare_vert

prepare_vert(
    vert_softmax_values: ndarray,
    gaussian_sigma: float = 0.85,
    gaussian_radius: int = 2,
    gaussian_regionwise: bool = True,
) -> np.ndarray

Smooth and normalize a per-vertebra-class softmax vector.

Optionally applies a 1-D Gaussian filter (either per spinal region or across all classes) and then normalizes to sum to 1.

Parameters:

Name Type Description Default
vert_softmax_values ndarray

Length-VERT_CLASSES per-class softmax values.

required
gaussian_sigma float

Gaussian smoothing sigma; 0 disables smoothing.

0.85
gaussian_radius int

Half-width of the Gaussian kernel.

2
gaussian_regionwise bool

If True, smooth each spinal region independently instead of across the whole vector.

True

Returns:

Type Description
ndarray

np.ndarray: The smoothed, sum-normalized per-class vector.

Source code in spineps/phase_labeling.py
def prepare_vert(
    vert_softmax_values: np.ndarray,
    gaussian_sigma: float = 0.85,
    gaussian_radius: int = 2,
    gaussian_regionwise: bool = True,
) -> np.ndarray:
    """Smooth and normalize a per-vertebra-class softmax vector.

    Optionally applies a 1-D Gaussian filter (either per spinal region or across all classes) and then normalizes to sum to 1.

    Args:
        vert_softmax_values (np.ndarray): Length-``VERT_CLASSES`` per-class softmax values.
        gaussian_sigma (float): Gaussian smoothing sigma; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.
        gaussian_regionwise (bool): If True, smooth each spinal region independently instead of across the whole vector.

    Returns:
        np.ndarray: The smoothed, sum-normalized per-class vector.
    """
    # gaussian region-wise
    softmax_values = vert_softmax_values.copy()
    if gaussian_sigma > 0.0:
        if gaussian_regionwise:
            for s in [CERV, THOR, LUMB]:
                softmax_values[s] = gaussian_filter1d(softmax_values[s], sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
        else:
            softmax_values = gaussian_filter1d(softmax_values, sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
    softmax_values /= np.sum(softmax_values) + DIVIDE_BY_ZERO_OFFSET
    return softmax_values

prepare_vertgrp

prepare_vertgrp(
    vertgrp_softmax_values: ndarray,
    gaussian_sigma: float = 0.85,
    gaussian_radius: int = 2,
    gaussian_regionwise: bool = True,
) -> np.ndarray

Expand a vertebra-group softmax to per-vertebra classes, then smooth and normalize it.

Distributes each vertebra-group probability onto its member vertebra classes (via vert_group_idx_to_exact_idx_dict), optionally applies a 1-D Gaussian filter (per region or globally), and normalizes to sum to 1.

Parameters:

Name Type Description Default
vertgrp_softmax_values ndarray

Per-vertebra-group softmax values.

required
gaussian_sigma float

Gaussian smoothing sigma; 0 disables smoothing.

0.85
gaussian_radius int

Half-width of the Gaussian kernel.

2
gaussian_regionwise bool

If True, smooth each spinal region independently instead of across the whole vector.

True

Returns:

Type Description
ndarray

np.ndarray: The expanded, smoothed, sum-normalized per-class vector.

Source code in spineps/phase_labeling.py
def prepare_vertgrp(
    vertgrp_softmax_values: np.ndarray,
    gaussian_sigma: float = 0.85,
    gaussian_radius: int = 2,
    gaussian_regionwise: bool = True,
) -> np.ndarray:
    """Expand a vertebra-group softmax to per-vertebra classes, then smooth and normalize it.

    Distributes each vertebra-group probability onto its member vertebra classes (via ``vert_group_idx_to_exact_idx_dict``),
    optionally applies a 1-D Gaussian filter (per region or globally), and normalizes to sum to 1.

    Args:
        vertgrp_softmax_values (np.ndarray): Per-vertebra-group softmax values.
        gaussian_sigma (float): Gaussian smoothing sigma; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.
        gaussian_regionwise (bool): If True, smooth each spinal region independently instead of across the whole vector.

    Returns:
        np.ndarray: The expanded, smoothed, sum-normalized per-class vector.
    """
    # gaussian region-wise
    softmax_values = np.zeros(VERT_CLASSES)
    for i, g in vert_group_idx_to_exact_idx_dict.items():
        softmax_values[g] = vertgrp_softmax_values[i]
    if gaussian_sigma > 0.0:
        if gaussian_regionwise:
            for s in [CERV, THOR, LUMB]:
                softmax_values[s] = gaussian_filter1d(softmax_values[s], sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
        else:
            softmax_values = gaussian_filter1d(softmax_values, sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
    softmax_values /= np.sum(softmax_values) + DIVIDE_BY_ZERO_OFFSET
    return softmax_values

prepare_visible

prepare_visible(
    predictions: dict,
    visible_w: float = 1.0,
    gaussian_sigma: float = 0.8,
    gaussian_radius: int = 2,
) -> np.ndarray

Build a per-instance confidence-weighting chain from the classifier's "fully visible" head.

For each instance, reads the probability of being fully visible (if the FULLYVISIBLE head is present, else assumes 1), optionally Gaussian-smooths it along the instance axis, and converts it into a multiplicative weight in [0, 1] that down-weights partially visible (cropped) vertebrae according to visible_w.

Parameters:

Name Type Description Default
predictions dict

Per-instance classifier outputs, each holding a "soft" dict of head softmax arrays.

required
visible_w float

Strength of the visibility down-weighting; 0 disables it.

1.0
gaussian_sigma float

Gaussian smoothing sigma along the instance axis; 0 disables smoothing.

0.8
gaussian_radius int

Half-width of the Gaussian kernel.

2

Returns:

Type Description
ndarray

np.ndarray: Per-instance multiplicative weights clipped to [0, 1].

Source code in spineps/phase_labeling.py
def prepare_visible(predictions: dict, visible_w: float = 1.0, gaussian_sigma: float = 0.8, gaussian_radius: int = 2) -> np.ndarray:
    """Build a per-instance confidence-weighting chain from the classifier's "fully visible" head.

    For each instance, reads the probability of being fully visible (if the ``FULLYVISIBLE`` head is present, else assumes 1),
    optionally Gaussian-smooths it along the instance axis, and converts it into a multiplicative weight in ``[0, 1]`` that
    down-weights partially visible (cropped) vertebrae according to ``visible_w``.

    Args:
        predictions (dict): Per-instance classifier outputs, each holding a ``"soft"`` dict of head softmax arrays.
        visible_w (float): Strength of the visibility down-weighting; 0 disables it.
        gaussian_sigma (float): Gaussian smoothing sigma along the instance axis; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.

    Returns:
        np.ndarray: Per-instance multiplicative weights clipped to ``[0, 1]``.
    """
    # has soft and FULLYVISIBLE key
    predict_keys = list(predictions[list(predictions.keys())[0]]["soft"].keys())  # noqa: RUF015
    if "FULLYVISIBLE" in predict_keys:
        visible_chain = np.asarray([k["soft"]["FULLYVISIBLE"][1] for k in predictions.values()])
    else:
        visible_chain = np.ones(len(predictions))
    if gaussian_sigma > 0.0:
        visible_chain = gaussian_filter1d(visible_chain, sigma=gaussian_sigma, mode="constant", radius=gaussian_radius)
    visible_chain = np.round(visible_chain, 3)
    # weighting
    visible_chain = 1 - visible_chain
    visible_chain = np.multiply(visible_chain, visible_w)
    visible_chain = 1 - visible_chain
    visible_chain = np.clip(visible_chain, 0, 1)
    # visible_chain /= np.sum(visible_chain)
    return visible_chain

prepare_region

prepare_region(
    region_softmax_values: ndarray,
    gaussian_sigma: float = 0.75,
    gaussian_radius: int = 1,
) -> np.ndarray

Broadcast a region softmax to per-vertebra classes, then smooth and normalize it.

Parameters:

Name Type Description Default
region_softmax_values ndarray

Length-3 region softmax values (cervical, thoracic, lumbar).

required
gaussian_sigma float

Gaussian smoothing sigma; 0 disables smoothing.

0.75
gaussian_radius int

Half-width of the Gaussian kernel.

1

Returns:

Type Description
ndarray

np.ndarray: The broadcast, smoothed, sum-normalized per-class vector.

Source code in spineps/phase_labeling.py
def prepare_region(region_softmax_values: np.ndarray, gaussian_sigma: float = 0.75, gaussian_radius: int = 1) -> np.ndarray:
    """Broadcast a region softmax to per-vertebra classes, then smooth and normalize it.

    Args:
        region_softmax_values (np.ndarray): Length-3 region softmax values (cervical, thoracic, lumbar).
        gaussian_sigma (float): Gaussian smoothing sigma; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.

    Returns:
        np.ndarray: The broadcast, smoothed, sum-normalized per-class vector.
    """
    softmax_values = region_to_vert(region_softmax_values)
    if gaussian_sigma > 0.0 and np.sum(softmax_values) > 0.0:
        softmax_values = gaussian_filter1d(softmax_values, sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
    softmax_values /= np.sum(softmax_values) + DIVIDE_BY_ZERO_OFFSET
    return softmax_values

prepare_vertrel_columns

prepare_vertrel_columns(
    vertrel_matrix: ndarray,
    gaussian_sigma: float = 0.75,
    gaussian_radius: int = 1,
) -> np.ndarray

Smooth and column-normalize the relative-position (VertRel) cost matrix.

For each VertRel label (column, skipping the first), optionally Gaussian-smooths the values along the instance axis and normalizes the column so its values stay bounded (divides by the column sum when it exceeds 1, otherwise by 1 + sum).

Parameters:

Name Type Description Default
vertrel_matrix ndarray

Matrix of shape (n_instances, len(VertRel)) of relative-position softmax values.

required
gaussian_sigma float

Gaussian smoothing sigma along the instance axis; 0 disables smoothing.

0.75
gaussian_radius int

Half-width of the Gaussian kernel.

1

Returns:

Type Description
ndarray

np.ndarray: The smoothed, column-normalized relative-position matrix (modified in place and returned).

Source code in spineps/phase_labeling.py
def prepare_vertrel_columns(vertrel_matrix: np.ndarray, gaussian_sigma: float = 0.75, gaussian_radius: int = 1) -> np.ndarray:
    """Smooth and column-normalize the relative-position (VertRel) cost matrix.

    For each VertRel label (column, skipping the first), optionally Gaussian-smooths the values along the instance axis and
    normalizes the column so its values stay bounded (divides by the column sum when it exceeds 1, otherwise by ``1 + sum``).

    Args:
        vertrel_matrix (np.ndarray): Matrix of shape ``(n_instances, len(VertRel))`` of relative-position softmax values.
        gaussian_sigma (float): Gaussian smoothing sigma along the instance axis; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.

    Returns:
        np.ndarray: The smoothed, column-normalized relative-position matrix (modified in place and returned).
    """
    for i in range(1, min(len(VertRel), vertrel_matrix.shape[1])):
        if gaussian_sigma > 0.0 and np.sum(vertrel_matrix) > 0.0:
            vertrel_matrix[:, i] = gaussian_filter1d(vertrel_matrix[:, i], sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
        # normalize per column / label in this case
        vertrel_sum = np.sum(vertrel_matrix[:, i]) + DIVIDE_BY_ZERO_OFFSET
        if vertrel_sum > 1.0:
            vertrel_matrix[:, i] = vertrel_matrix[:, i] / vertrel_sum
        elif vertrel_sum < 1.0:
            vertrel_matrix[:, i] = vertrel_matrix[:, i] / (1.0 + vertrel_sum)
    return vertrel_matrix

prepare_vertt13_columns

prepare_vertt13_columns(
    vertt13_matrix: ndarray,
) -> np.ndarray

Column-normalize the T13-anomaly (VertT13) cost matrix.

Normalizes each VertT13 label (column, skipping the first) so it sums to 1 along the instance axis.

Parameters:

Name Type Description Default
vertt13_matrix ndarray

Matrix of shape (n_instances, len(VertT13)) of T13-anomaly softmax values.

required

Returns:

Type Description
ndarray

np.ndarray: The column-normalized matrix (modified in place and returned).

Source code in spineps/phase_labeling.py
def prepare_vertt13_columns(vertt13_matrix: np.ndarray) -> np.ndarray:
    """Column-normalize the T13-anomaly (VertT13) cost matrix.

    Normalizes each VertT13 label (column, skipping the first) so it sums to 1 along the instance axis.

    Args:
        vertt13_matrix (np.ndarray): Matrix of shape ``(n_instances, len(VertT13))`` of T13-anomaly softmax values.

    Returns:
        np.ndarray: The column-normalized matrix (modified in place and returned).
    """
    for i in range(1, min(len(VertT13), vertt13_matrix.shape[1])):
        # normalize per column / label in this case
        vertt13_matrix[:, i] = vertt13_matrix[:, i] / (np.sum(vertt13_matrix[:, i]) + DIVIDE_BY_ZERO_OFFSET)
    return vertt13_matrix

prepare_vertrel

prepare_vertrel(
    vertrel_softmax_values: ndarray,
    gaussian_sigma: float = 0.75,
    gaussian_radius: int = 1,
) -> np.ndarray

Optionally Gaussian-smooth a relative-position (VertRel) softmax vector.

Parameters:

Name Type Description Default
vertrel_softmax_values ndarray

Relative-position softmax values for a single instance.

required
gaussian_sigma float

Gaussian smoothing sigma; 0 disables smoothing.

0.75
gaussian_radius int

Half-width of the Gaussian kernel.

1

Returns:

Type Description
ndarray

np.ndarray: The (optionally smoothed) relative-position vector; not re-normalized.

Source code in spineps/phase_labeling.py
def prepare_vertrel(vertrel_softmax_values: np.ndarray, gaussian_sigma: float = 0.75, gaussian_radius: int = 1) -> np.ndarray:
    """Optionally Gaussian-smooth a relative-position (VertRel) softmax vector.

    Args:
        vertrel_softmax_values (np.ndarray): Relative-position softmax values for a single instance.
        gaussian_sigma (float): Gaussian smoothing sigma; 0 disables smoothing.
        gaussian_radius (int): Half-width of the Gaussian kernel.

    Returns:
        np.ndarray: The (optionally smoothed) relative-position vector; not re-normalized.
    """
    softmax_values = vertrel_softmax_values.copy()
    if gaussian_sigma > 0.0:
        softmax_values = gaussian_filter1d(softmax_values, sigma=gaussian_sigma, mode="nearest", radius=gaussian_radius)
    # softmax_values /= np.sum(softmax_values) + DIVIDE_BY_ZERO_OFFSET
    return softmax_values

find_vert_path_from_predictions

find_vert_path_from_predictions(
    predictions,
    visible_w: float = 0.5,
    vert_w: float = 0.9,
    vertgrp_w: float = 0.8,
    region_w: float = 1.1,
    vertrel_w: float = 0.6,
    vertt13_w: float = 0.4,
    disable_c1: bool = True,
    boost_c2: float = 1.0,
    allow_cervical_skip: bool = False,
    allow_thoracic_skip: bool = False,
    allow_lumbar_skip: bool = False,
    punish_multiple_sequence: float = 0.0,
    punish_skip_sequence: float = 0.0,
    punish_skip_at_region_sequence: float = 0.0,
    region_gaussian_sigma: float = 0.0,
    vert_gaussian_sigma: float = 0.8,
    vert_gaussian_regionwise: bool = True,
    vertgrp_gaussian_sigma: float = 0.8,
    vertgrp_gaussian_regionwise: bool = True,
    vertrel_column_norm: bool = True,
    vertrel_gaussian_sigma: float = 0.6,
    focus_tl_gap: bool = True,
    argmax_combined_cost_matrix_instead_of_path_algorithm: bool = False,
    proc_lab_force_no_tl_anomaly: bool = False,
    verbose: bool = False,
) -> tuple[float, list[int], list[int], list, list, dict]

Combine the classifier's prediction heads into a cost matrix and solve for the most probable vertebra label sequence.

Builds a per-instance / per-class cost matrix by weighting and summing the available prediction heads (VERT, VERTGRP, REGION), down-weighting by the "fully visible" chain, optionally boosting C2, and adding separate relative-position (VertRel) and T13-anomaly (VertT13) cost terms. The cheapest monotonically increasing label path is then found with :func:find_most_probably_sequence (unless argmax_combined_cost_matrix_instead_of_path_algorithm is set, which falls back to a plain per-instance argmax). Special transitional vertebrae (T11 skip, T12/L5 repeats) and per-region skips are permitted via the corresponding flags. Finally the path is post-processed (see :func:fpath_post_processing).

Parameters:

Name Type Description Default
predictions dict

Per-instance classifier outputs, each holding a "soft" dict of per-head softmax arrays.

required
visible_w float

Weight of the "fully visible" down-weighting (must be in [0, 1]).

0.5
vert_w float

Weight of the per-vertebra (VERT) head.

0.9
vertgrp_w float

Weight of the vertebra-group (VERTGRP) head.

0.8
region_w float

Weight of the spinal-region (REGION) head.

1.1
vertrel_w float

Weight of the relative-position (VERTREL) cost term.

0.6
vertt13_w float

Weight of the T13-anomaly (VERTT13) cost term.

0.4
disable_c1 bool

If True, the path may not start at C1 (starts at C2 instead).

True
boost_c2 float

Multiplicative boost applied to a prediction whose argmax is C2; 0 disables it.

1.0
allow_cervical_skip bool

If True, allow skipping a class within the cervical region.

False
allow_thoracic_skip bool

If True, allow skipping a class within the thoracic region.

False
allow_lumbar_skip bool

If True, allow skipping a class within the lumbar region.

False
punish_multiple_sequence float

Extra cost for repeating an allowed-multiple class.

0.0
punish_skip_sequence float

Extra cost for skipping an allowed-skip class.

0.0
punish_skip_at_region_sequence float

Extra cost for skipping at a region boundary.

0.0
region_gaussian_sigma float

Gaussian sigma for the region head; 0 disables smoothing.

0.0
vert_gaussian_sigma float

Gaussian sigma for the vertebra head; 0 disables smoothing.

0.8
vert_gaussian_regionwise bool

If True, smooth the vertebra head per region.

True
vertgrp_gaussian_sigma float

Gaussian sigma for the vertebra-group head; 0 disables smoothing.

0.8
vertgrp_gaussian_regionwise bool

If True, smooth the vertebra-group head per region.

True
vertrel_column_norm bool

If True, column-normalize the relative-position matrix.

True
vertrel_gaussian_sigma float

Gaussian sigma used when column-normalizing the relative-position matrix.

0.6
focus_tl_gap bool

If True, focus on the T11/T13 thoracolumbar gap (reserved for the refinement pass).

True
argmax_combined_cost_matrix_instead_of_path_algorithm bool

If True, take a plain per-instance argmax instead of the path search.

False
proc_lab_force_no_tl_anomaly bool

If True, disallow T13 transitional-vertebra anomalies (no T11 skip / no T12 repeat).

False
verbose bool

If True, print the active head weights.

False

Returns:

Name Type Description
tuple tuple[float, list[int], list[int], list, list, dict]

(fcost, fpath, fpath_post, cost_matrix_list, min_costs_path, args) where fcost is the total path cost, fpath is the raw class path, fpath_post is the post-processed (1-based, T13-aware) label sequence, cost_matrix_list is the combined cost matrix as a nested list, min_costs_path is the per-step minimum cost path, and args is a snapshot of the call arguments.

Raises:

Type Description
AssertionError

If a weight is negative, visible_w exceeds 1, or no vital classification head (VERT/VERTEXACT/ VERTGRP) is present in the predictions.

Source code in spineps/phase_labeling.py
def find_vert_path_from_predictions(
    predictions,
    visible_w: float = 0.5,
    vert_w: float = 0.9,  # 0.9
    vertgrp_w: float = 0.8,
    region_w: float = 1.1,  # 1.1
    vertrel_w: float = 0.6,  # 0.3
    vertt13_w: float = 0.4,
    disable_c1: bool = True,
    boost_c2: float = 1.0,  # 3.0
    allow_cervical_skip: bool = False,
    allow_thoracic_skip: bool = False,
    allow_lumbar_skip: bool = False,
    #
    punish_multiple_sequence: float = 0.0,
    punish_skip_sequence: float = 0.0,
    punish_skip_at_region_sequence: float = 0.0,
    #
    region_gaussian_sigma: float = 0.0,  # 0 means no gaussian
    vert_gaussian_sigma: float = 0.8,  # 0.8 0 means no gaussian
    vert_gaussian_regionwise: bool = True,
    vertgrp_gaussian_sigma: float = 0.8,  # 0.8 0 means no gaussian
    vertgrp_gaussian_regionwise: bool = True,
    vertrel_column_norm: bool = True,
    vertrel_gaussian_sigma: float = 0.6,  # 0.6 # 0 means no gaussian
    #
    focus_tl_gap: bool = True,  # focus on T11/T13 gap (if T11/t13 case is detected, predict again using crops and then check again)
    argmax_combined_cost_matrix_instead_of_path_algorithm: bool = False,
    proc_lab_force_no_tl_anomaly: bool = False,
    #
    verbose: bool = False,
) -> tuple[float, list[int], list[int], list, list, dict]:
    """Combine the classifier's prediction heads into a cost matrix and solve for the most probable vertebra label sequence.

    Builds a per-instance / per-class cost matrix by weighting and summing the available prediction heads (VERT, VERTGRP,
    REGION), down-weighting by the "fully visible" chain, optionally boosting C2, and adding separate relative-position
    (VertRel) and T13-anomaly (VertT13) cost terms. The cheapest monotonically increasing label path is then found with
    :func:`find_most_probably_sequence` (unless ``argmax_combined_cost_matrix_instead_of_path_algorithm`` is set, which falls
    back to a plain per-instance argmax). Special transitional vertebrae (T11 skip, T12/L5 repeats) and per-region skips are
    permitted via the corresponding flags. Finally the path is post-processed (see :func:`fpath_post_processing`).

    Args:
        predictions (dict): Per-instance classifier outputs, each holding a ``"soft"`` dict of per-head softmax arrays.
        visible_w (float): Weight of the "fully visible" down-weighting (must be in ``[0, 1]``).
        vert_w (float): Weight of the per-vertebra (VERT) head.
        vertgrp_w (float): Weight of the vertebra-group (VERTGRP) head.
        region_w (float): Weight of the spinal-region (REGION) head.
        vertrel_w (float): Weight of the relative-position (VERTREL) cost term.
        vertt13_w (float): Weight of the T13-anomaly (VERTT13) cost term.
        disable_c1 (bool): If True, the path may not start at C1 (starts at C2 instead).
        boost_c2 (float): Multiplicative boost applied to a prediction whose argmax is C2; 0 disables it.
        allow_cervical_skip (bool): If True, allow skipping a class within the cervical region.
        allow_thoracic_skip (bool): If True, allow skipping a class within the thoracic region.
        allow_lumbar_skip (bool): If True, allow skipping a class within the lumbar region.
        punish_multiple_sequence (float): Extra cost for repeating an allowed-multiple class.
        punish_skip_sequence (float): Extra cost for skipping an allowed-skip class.
        punish_skip_at_region_sequence (float): Extra cost for skipping at a region boundary.
        region_gaussian_sigma (float): Gaussian sigma for the region head; 0 disables smoothing.
        vert_gaussian_sigma (float): Gaussian sigma for the vertebra head; 0 disables smoothing.
        vert_gaussian_regionwise (bool): If True, smooth the vertebra head per region.
        vertgrp_gaussian_sigma (float): Gaussian sigma for the vertebra-group head; 0 disables smoothing.
        vertgrp_gaussian_regionwise (bool): If True, smooth the vertebra-group head per region.
        vertrel_column_norm (bool): If True, column-normalize the relative-position matrix.
        vertrel_gaussian_sigma (float): Gaussian sigma used when column-normalizing the relative-position matrix.
        focus_tl_gap (bool): If True, focus on the T11/T13 thoracolumbar gap (reserved for the refinement pass).
        argmax_combined_cost_matrix_instead_of_path_algorithm (bool): If True, take a plain per-instance argmax instead of the
            path search.
        proc_lab_force_no_tl_anomaly (bool): If True, disallow T13 transitional-vertebra anomalies (no T11 skip / no T12 repeat).
        verbose (bool): If True, print the active head weights.

    Returns:
        tuple: ``(fcost, fpath, fpath_post, cost_matrix_list, min_costs_path, args)`` where ``fcost`` is the total path cost,
            ``fpath`` is the raw class path, ``fpath_post`` is the post-processed (1-based, T13-aware) label sequence,
            ``cost_matrix_list`` is the combined cost matrix as a nested list, ``min_costs_path`` is the per-step minimum cost
            path, and ``args`` is a snapshot of the call arguments.

    Raises:
        AssertionError: If a weight is negative, ``visible_w`` exceeds 1, or no vital classification head (VERT/VERTEXACT/
            VERTGRP) is present in the predictions.
    """
    args = locals()
    assert 0 <= visible_w, visible_w  # noqa: SIM300
    assert visible_w <= 1.0, f"visible_w must be <= 1.0, got {visible_w}"
    assert 0 <= vert_w, vert_w  # noqa: SIM300
    assert 0 <= region_w, region_w  # noqa: SIM300
    assert 0 <= vertrel_w, vertrel_w  # noqa: SIM300
    assert 0 <= boost_c2, boost_c2  # noqa: SIM300
    #
    n_vert = len(predictions)
    #
    cost_matrix = np.zeros((n_vert, VERT_CLASSES))
    relative_cost_matrix = np.zeros((n_vert, len(VertRel)))
    visible_chain = prepare_visible(predictions, visible_w)
    # print(visible_chain)

    predict_keys = list(predictions[list(predictions.keys())[0]]["soft"].keys())  # noqa: RUF015
    assert "VERT" in predict_keys or "VERTEXACT" in predict_keys or "VERTEX" in predict_keys or "VERTGRP" in predict_keys, (
        f"No vital classification head found, got {predict_keys}"
    )

    # VertRel normalize over labels
    if "VERTREL" in predict_keys:
        vertrel_matrix = np.asarray([k["soft"]["VERTREL"] for k in predictions.values()])
    else:
        vertrel_matrix = np.zeros((n_vert, len(VertRel)))
    if vertrel_column_norm:
        vertrel_matrix = prepare_vertrel_columns(vertrel_matrix, gaussian_sigma=vertrel_gaussian_sigma)

    if "VERTT13" in predict_keys:
        vertt13_softmax_output = np.asarray([k["soft"]["VERTT13"] for k in predictions.values()])
    else:
        vertt13_softmax_output = np.zeros((n_vert, len(VertT13)))
    vertt13_values = np.multiply(
        -prepare_vertt13_columns(vertt13_softmax_output),
        vertt13_w,
    )

    if verbose:
        print("visible_w", visible_w) if "FULLYVISIBLE" in predict_keys else None
        print("vert_w", vert_w) if "VERT" in predict_keys else None
        print("region_w", region_w) if "REGION" in predict_keys else None
        print("vertrel_w", vertrel_w) if "VERTREL" in predict_keys else None
        print("vertgrp_w", vertgrp_w) if "VERTGRP" in predict_keys else None
        print("vertt13_w", vertt13_w) if "VERTT13" in predict_keys else None
        print("disable_c1", disable_c1)
        print("boost_c2", boost_c2)
        print("allow_cervical_skip", allow_cervical_skip)
        print("region_gaussian_sigma", region_gaussian_sigma) if "VERTREGION" in predict_keys else None
        print("vert_gaussian_sigma", vert_gaussian_sigma) if "VERT" in predict_keys else None
        print("vert_gaussian_regionwise", vert_gaussian_regionwise) if "VERT" in predict_keys else None
        print("vertrel_gaussian_sigma", vertrel_gaussian_sigma) if "VERTREL" in predict_keys else None

    #
    for idx, (_, k) in enumerate(predictions.items()):
        vert_softmax_output = k["soft"]["VERT"] if "VERT" in predict_keys else np.zeros(len(VertExact))
        vert_values = np.multiply(
            prepare_vert(
                vert_softmax_output,
                gaussian_sigma=vert_gaussian_sigma,
                gaussian_regionwise=vert_gaussian_regionwise,
            ),
            vert_w,
        )

        vertgrp_softmax_output = k["soft"]["VERTGRP"] if "VERTGRP" in predict_keys else np.zeros(len(VertGroup))
        vertgrp_values = np.multiply(
            prepare_vertgrp(
                vertgrp_softmax_output,
                gaussian_sigma=vertgrp_gaussian_sigma,
                gaussian_regionwise=vertgrp_gaussian_regionwise,
            ),
            vertgrp_w,
        )

        # if "REGION" in k["soft"] else np.zeros((4, *vert_softmax_output.shape[1:]))
        region_softmax_output = k["soft"]["REGION"] if "REGION" in predict_keys else np.zeros(len(VertRegion))
        region_values = np.multiply(
            prepare_region(
                region_softmax_output,
                gaussian_sigma=region_gaussian_sigma,
            ),
            region_w,
        )

        #
        # add region and vert
        final_vert_pred = np.add(region_values, vert_values)
        final_vert_pred = np.add(final_vert_pred, vertgrp_values)
        # normalize
        final_vert_pred /= np.sum(final_vert_pred) + DIVIDE_BY_ZERO_OFFSET
        # boost c2 if enabled
        if boost_c2 > 0.0 and np.argmax(final_vert_pred) == C2_CLASS_IDX:
            final_vert_pred = np.multiply(final_vert_pred, boost_c2)
        # then multiply with visible factor
        final_vert_pred = np.multiply(final_vert_pred, visible_chain[idx])
        cost_matrix[idx] = final_vert_pred
        # relative gets own matrix

        # vertrel_softmax_output = k["soft"]["VERTREL"] if "VERTREL" in k["soft"] else np.zeros((6, *vert_softmax_output.shape[1:]))
        relative_cost_matrix[idx] = prepare_vertrel(
            vertrel_matrix[idx],
            gaussian_sigma=0.0,
        )
    cost_matrix = np.asarray(cost_matrix)
    # invert rel cost
    relative_cost_matrix = np.multiply(-relative_cost_matrix, vertrel_w)
    # for i in range(len(relative_cost_matrix)):
    #    print(relative_cost_matrix[i])
    #
    if argmax_combined_cost_matrix_instead_of_path_algorithm:
        fcost = 0
        min_costs_path = [[]]
        fpath = list(np.argmax(cost_matrix, axis=1))
    else:
        allow_multiple_at_class = [T12_CLASS_IDX, L5_CLASS_IDX] if not proc_lab_force_no_tl_anomaly else [L5_CLASS_IDX]
        allow_skip_at_class = [T11_CLASS_IDX] if not proc_lab_force_no_tl_anomaly else []
        allow_skip_at_region = []
        if allow_cervical_skip:
            allow_skip_at_region.append(0)
        if allow_thoracic_skip:
            allow_skip_at_region.append(1)
        if allow_lumbar_skip:
            allow_skip_at_region.append(2)
        fcost, fpath, min_costs_path = find_most_probably_sequence(
            # input
            cost_matrix,
            min_start_class=C1_CLASS_IDX if not disable_c1 else C2_CLASS_IDX,
            region_rel_cost=relative_cost_matrix,
            vertt13_cost=vertt13_values,
            invert_cost=True,
            # parameters
            punish_multiple_sequence=punish_multiple_sequence,
            punish_skip_sequence=punish_skip_sequence,
            # no touch
            regions=list(DEFAULT_REGION_STARTS),
            allow_multiple_at_class=allow_multiple_at_class,
            allow_skip_at_class=allow_skip_at_class,
            #
            allow_skip_at_region=allow_skip_at_region,
            punish_skip_at_region_sequence=punish_skip_at_region_sequence,
            verbose=False,
        )
    # post processing
    fpath_post = fpath_post_processing(fpath)
    return fcost, fpath, fpath_post, cost_matrix.tolist(), min_costs_path, args

fpath_post_processing

fpath_post_processing(fpath) -> list[int]

Post-process a raw 0-based class path into the final 1-based vertebra label sequence.

Resolves transitional-vertebra anomalies (two consecutive T12 become T12 + T13; a trailing double L5 becomes L5 + L6) and shifts every class index by 1 to the final label convention, leaving the special T13 label untouched.

Parameters:

Name Type Description Default
fpath list[int]

Raw 0-based class path from the cost/path search.

required

Returns:

Type Description
list[int]

list[int]: The post-processed 1-based vertebra label sequence (with T13/L6 anomalies applied).

Source code in spineps/phase_labeling.py
def fpath_post_processing(fpath) -> list[int]:
    """Post-process a raw 0-based class path into the final 1-based vertebra label sequence.

    Resolves transitional-vertebra anomalies (two consecutive T12 become T12 + T13; a trailing double L5 becomes L5 + L6) and
    shifts every class index by 1 to the final label convention, leaving the special T13 label untouched.

    Args:
        fpath (list[int]): Raw 0-based class path from the cost/path search.

    Returns:
        list[int]: The post-processed 1-based vertebra label sequence (with T13/L6 anomalies applied).
    """
    fpath_post = fpath[:]

    # Two T12 -> T12 + T13
    if VertExact.T12.value in fpath_post:
        tidx = fpath_post.index(VertExact.T12.value)
        if tidx != 0 and fpath_post[tidx - 1] == VertExact.T12.value:
            fpath_post[tidx] = T13_LABEL
        elif tidx != len(fpath_post) - 1 and fpath_post[tidx + 1] == VertExact.T12.value:
            fpath_post[tidx + 1] = T13_LABEL
    # Two L5 -> L5, L6
    if (VertExact.L5.value in fpath_post and len(fpath_post) >= 2) and (
        fpath_post[-1] == VertExact.L5.value and fpath_post[-2] == VertExact.L5.value
    ):
        fpath_post[-1] += 1

    fpath_post = [f + 1 if f != T13_LABEL else T13_LABEL for f in fpath_post]
    return fpath_post

is_valid_vertebra_sequence

is_valid_vertebra_sequence(
    sequence: list[VertExact] | list[int],
) -> bool

Check whether a vertebra label sequence is anatomically contiguous top-to-bottom.

A sequence is valid if each label follows the previous one by exactly 1, or forms one of the allowed transitional jumps at the thoracolumbar junction (T13->L1, i.e. 28->20, and T12->L1, i.e. 18->20). VertExact inputs are first converted via :func:fpath_post_processing.

Parameters:

Name Type Description Default
sequence list[VertExact] | list[int]

The vertebra label sequence, either as VertExact enums or 1-based ints.

required

Returns:

Name Type Description
bool bool

True if the sequence is a valid contiguous vertebra run, otherwise False.

Source code in spineps/phase_labeling.py
def is_valid_vertebra_sequence(sequence: list[VertExact] | list[int]) -> bool:
    """Check whether a vertebra label sequence is anatomically contiguous top-to-bottom.

    A sequence is valid if each label follows the previous one by exactly 1, or forms one of the allowed transitional jumps at
    the thoracolumbar junction (T13->L1, i.e. 28->20, and T12->L1, i.e. 18->20). ``VertExact`` inputs are first converted via
    :func:`fpath_post_processing`.

    Args:
        sequence (list[VertExact] | list[int]): The vertebra label sequence, either as ``VertExact`` enums or 1-based ints.

    Returns:
        bool: True if the sequence is a valid contiguous vertebra run, otherwise False.
    """
    sequence2: list[int] = fpath_post_processing([s.value for s in sequence]) if isinstance(sequence[0], VertExact) else sequence  # type: ignore
    # must be sequence of vertebrae
    for i in range(1, len(sequence2)):
        if (
            sequence2[i] - sequence2[i - 1] == 1
            or (sequence2[i] == 20 and sequence2[i - 1] == 28)
            or (sequence2[i] == 20 and sequence2[i - 1] == 18)
        ):
            continue
        else:
            return False
    return True

spineps.phase_post

spineps.phase_post

Post-processing phase: clean and reconcile the semantic and vertebra-instance masks and attach IVD/endplate instance labels.

phase_postprocess_combined

phase_postprocess_combined(
    img_nii: NII,
    seg_nii: NII,
    vert_nii: NII,
    model_labeling: VertLabelingClassifier | None,
    debug_data: dict | None,
    labeling_offset: int = 0,
    proc_lab_force_no_tl_anomaly: bool = False,
    proc_assign_missing_cc: bool = True,
    proc_assign_missing_cc_fast=False,
    proc_clean_inst_by_sem: bool = True,
    n_vert_bodies: int | None = None,
    process_merge_vertebra: bool = True,
    proc_vertebra_inconsistency: bool = True,
    proc_assign_posterior_instance_label: bool = True,
    verbose: bool = False,
    disable_c1=True,
    sacrum_ids=(v_name2idx["S1"],),
) -> tuple[NII, NII]

Run the combined semantic/instance post-processing pipeline and return cleaned, anatomically labeled masks.

Crops both masks to the segmentation, optionally fixes superior/inferior articular inconsistencies, reconciles the instance and semantic masks (reassigning or deleting unmatched connected components), splits accidentally merged vertebrae, fixes mislabeled posterior elements, labels instances top-to-bottom, optionally runs the anatomical labeling classifier, forces the sacrum and dens labels, attaches IVD and endplate instance labels (splitting endplates into superior/inferior), and finally un-crops back to the original field of view.

Parameters:

Name Type Description Default
img_nii NII

Input MRI image.

required
seg_nii NII

Subregion semantic segmentation mask.

required
vert_nii NII

Vertebra instance segmentation mask.

required
model_labeling VertLabelingClassifier | None

Anatomical labeling classifier; if None, instances keep their top-to-bottom labels.

required
debug_data dict | None

Optional dict that intermediate results are written into; created if None.

required
labeling_offset int

Offset added to the top-to-bottom instance labels.

0
proc_lab_force_no_tl_anomaly bool

If True, disallow T13 transitional-vertebra anomalies during labeling.

False
proc_assign_missing_cc bool

If True, reassign semantic connected components not covered by the instance mask.

True
proc_assign_missing_cc_fast bool

If True, use the faster infect-based missing-CC assignment.

False
proc_clean_inst_by_sem bool

If True, mask the instance mask by the semantic mask before processing.

True
n_vert_bodies int | None

Number of vertebra bodies; inferred from the instance mask if None.

None
process_merge_vertebra bool

If True, detect and merge accidentally split adjacent vertebrae.

True
proc_vertebra_inconsistency bool

If True, reassign inconsistent articular substructures by instance overlap.

True
proc_assign_posterior_instance_label bool

If True, fix wrongly labeled posterior instance elements.

True
verbose bool

If True, print verbose progress.

False
disable_c1 bool

If True (and labeling_offset >= 1), do not predict a C1 label.

True
sacrum_ids tuple

Semantic label id(s) treated as sacrum and mapped to the S1 instance label.

(v_name2idx['S1'],)

Returns:

Type Description
tuple[NII, NII]

tuple[NII, NII]: The cleaned (seg_uncropped, vert_uncropped) semantic and vertebra-instance masks.

Source code in spineps/phase_post.py
def phase_postprocess_combined(
    img_nii: NII,
    seg_nii: NII,
    vert_nii: NII,
    model_labeling: VertLabelingClassifier | None,
    debug_data: dict | None,
    labeling_offset: int = 0,
    proc_lab_force_no_tl_anomaly: bool = False,
    proc_assign_missing_cc: bool = True,
    proc_assign_missing_cc_fast=False,
    proc_clean_inst_by_sem: bool = True,
    n_vert_bodies: int | None = None,
    process_merge_vertebra: bool = True,
    proc_vertebra_inconsistency: bool = True,
    proc_assign_posterior_instance_label: bool = True,
    verbose: bool = False,
    disable_c1=True,
    sacrum_ids=(v_name2idx["S1"],),
) -> tuple[NII, NII]:
    """Run the combined semantic/instance post-processing pipeline and return cleaned, anatomically labeled masks.

    Crops both masks to the segmentation, optionally fixes superior/inferior articular inconsistencies, reconciles the
    instance and semantic masks (reassigning or deleting unmatched connected components), splits accidentally merged vertebrae,
    fixes mislabeled posterior elements, labels instances top-to-bottom, optionally runs the anatomical labeling classifier,
    forces the sacrum and dens labels, attaches IVD and endplate instance labels (splitting endplates into superior/inferior),
    and finally un-crops back to the original field of view.

    Args:
        img_nii (NII): Input MRI image.
        seg_nii (NII): Subregion semantic segmentation mask.
        vert_nii (NII): Vertebra instance segmentation mask.
        model_labeling (VertLabelingClassifier | None): Anatomical labeling classifier; if None, instances keep their
            top-to-bottom labels.
        debug_data (dict | None): Optional dict that intermediate results are written into; created if None.
        labeling_offset (int): Offset added to the top-to-bottom instance labels.
        proc_lab_force_no_tl_anomaly (bool): If True, disallow T13 transitional-vertebra anomalies during labeling.
        proc_assign_missing_cc (bool): If True, reassign semantic connected components not covered by the instance mask.
        proc_assign_missing_cc_fast (bool): If True, use the faster infect-based missing-CC assignment.
        proc_clean_inst_by_sem (bool): If True, mask the instance mask by the semantic mask before processing.
        n_vert_bodies (int | None): Number of vertebra bodies; inferred from the instance mask if None.
        process_merge_vertebra (bool): If True, detect and merge accidentally split adjacent vertebrae.
        proc_vertebra_inconsistency (bool): If True, reassign inconsistent articular substructures by instance overlap.
        proc_assign_posterior_instance_label (bool): If True, fix wrongly labeled posterior instance elements.
        verbose (bool): If True, print verbose progress.
        disable_c1 (bool): If True (and ``labeling_offset >= 1``), do not predict a C1 label.
        sacrum_ids (tuple): Semantic label id(s) treated as sacrum and mapped to the S1 instance label.

    Returns:
        tuple[NII, NII]: The cleaned ``(seg_uncropped, vert_uncropped)`` semantic and vertebra-instance masks.
    """
    logger.print("Post process", Log_Type.STAGE)
    with logger:
        img_nii.assert_affine(other=seg_nii)
        seg_nii.assert_affine(other=vert_nii)
        # Post process semantic mask
        ###################
        vert_nii = vert_nii.copy()
        if n_vert_bodies is None:
            n_vert_bodies = len(vert_nii.unique())
        if debug_data is None:
            debug_data = {}

        if proc_clean_inst_by_sem:
            vert_nii.apply_mask(seg_nii, inplace=True)
        crop_slices = seg_nii.compute_crop(dist=POSTPROCESS_CROP_MARGIN_MM / min(seg_nii.zoom))

        # Save uncropped to uncrop later
        vert_uncropped = vert_nii.copy()
        seg_uncropped = seg_nii.copy()

        # Crop down
        img_nii = img_nii.apply_crop(crop_slices)
        vert_nii.apply_crop_(crop_slices)
        seg_nii.apply_crop_(crop_slices)

        # Post processing both
        ###################

        if proc_vertebra_inconsistency:
            # Assigns superior/inferior based on instance label overlap
            assign_vertebra_inconsistency(vert_nii, seg_nii)

        whole_vert_nii_cleaned, seg_nii_cleaned = mask_cleaning_other(
            whole_vert_nii=vert_nii,
            seg_nii=seg_nii,
            n_vert_bodies=n_vert_bodies,
            proc_assign_missing_cc=proc_assign_missing_cc,
            proc_assign_missing_cc_fast=proc_assign_missing_cc_fast,
            verbose=verbose,
        )

        if process_merge_vertebra and Location.Vertebra_Disc.value in seg_nii_cleaned.unique():
            detect_and_solve_merged_vertebra(seg_nii_cleaned, whole_vert_nii_cleaned)

        if proc_assign_posterior_instance_label:
            whole_vert_nii_cleaned = fix_wrong_posterior_instance_label(seg_nii_cleaned, seg_inst=whole_vert_nii_cleaned, logger=logger)

        # Label vertebra top -> down
        whole_vert_nii_cleaned, vert_labels = label_instance_top_to_bottom(whole_vert_nii_cleaned, labeling_offset=labeling_offset)
        logger.print(f"Labeled {len(vert_labels)} vertebra instances from top to bottom")

        if model_labeling is not None:
            whole_vert_nii_cleaned = perform_labeling_step(
                model=model_labeling,
                img_nii=img_nii,
                vert_nii=whole_vert_nii_cleaned,
                subreg_nii=seg_nii_cleaned,
                proc_lab_force_no_tl_anomaly=proc_lab_force_no_tl_anomaly,
                disable_c1=labeling_offset >= 1 and disable_c1,
            )

        logger.print("vert_nii volumes:", whole_vert_nii_cleaned.volumes())
        logger.print("seg_nii", seg_nii_cleaned.unique())

        whole_vert_nii_cleaned[seg_nii_cleaned.extract_label(sacrum_ids).get_seg_array() == 1] = v_name2idx["S1"]
        whole_vert_nii_cleaned[seg_nii_cleaned == Location.Dens_axis.value] = C2_INSTANCE_LABEL
        vert_arr_cleaned, seg_arr_cleaned = add_ivd_ep_vert_label(whole_vert_nii_cleaned, seg_nii_cleaned)
        #
        #
        cur_segarr = seg_nii_cleaned.get_seg_array()
        cur_segarr[cur_segarr == Location.Endplate.value] = seg_arr_cleaned[cur_segarr == Location.Endplate.value]
        seg_nii_cleaned.set_array_(cur_segarr)
        ###############
        # Uncrop

        vert_uncropped[crop_slices] = vert_arr_cleaned
        seg_uncropped[crop_slices] = seg_nii_cleaned.get_seg_array()

        logger.print("vert_uncropped volumes", vert_uncropped.volumes())
        logger.print("seg_uncropped", seg_uncropped.unique())

        debug_data["vert_arr_return_final"] = vert_uncropped.copy()
    return seg_uncropped, vert_uncropped

mask_cleaning_other

mask_cleaning_other(
    whole_vert_nii: NII,
    seg_nii: NII,
    n_vert_bodies: int,
    proc_assign_missing_cc: bool = False,
    proc_assign_missing_cc_fast=False,
    verbose: bool = False,
) -> tuple[NII, NII]

Reconcile the vertebra instance mask with the vertebra portion of the semantic mask.

Extracts the vertebra subregions from the semantic mask and (optionally) reassigns semantic connected components that the instance mask missed, either via a fast infect pass or via :func:assign_missing_cc; deleted components are removed from the semantic mask. Logs a warning when the unmatched vertebra volume between the two masks is anomalously large.

Parameters:

Name Type Description Default
whole_vert_nii NII

Vertebra instance segmentation mask.

required
seg_nii NII

Subregion semantic segmentation mask.

required
n_vert_bodies int

Number of vertebra bodies, used to scale the unmatched-volume warning.

required
proc_assign_missing_cc bool

If True, reassign missed semantic components via :func:assign_missing_cc.

False
proc_assign_missing_cc_fast bool

If True, additionally run a fast infect-based assignment first.

False
verbose bool

If True, print verbose progress.

False

Returns:

Type Description
tuple[NII, NII]

tuple[NII, NII]: The cleaned (whole_vert_nii, seg_nii) instance and semantic masks.

Raises:

Type Description
AssertionError

If, with proc_assign_missing_cc enabled, the instance mask still has more vertebra voxels than the semantic mask (which should be impossible).

Source code in spineps/phase_post.py
def mask_cleaning_other(
    whole_vert_nii: NII,
    seg_nii: NII,
    n_vert_bodies: int,
    proc_assign_missing_cc: bool = False,
    proc_assign_missing_cc_fast=False,
    verbose: bool = False,
) -> tuple[NII, NII]:
    """Reconcile the vertebra instance mask with the vertebra portion of the semantic mask.

    Extracts the vertebra subregions from the semantic mask and (optionally) reassigns semantic connected components that the
    instance mask missed, either via a fast infect pass or via :func:`assign_missing_cc`; deleted components are removed from the
    semantic mask. Logs a warning when the unmatched vertebra volume between the two masks is anomalously large.

    Args:
        whole_vert_nii (NII): Vertebra instance segmentation mask.
        seg_nii (NII): Subregion semantic segmentation mask.
        n_vert_bodies (int): Number of vertebra bodies, used to scale the unmatched-volume warning.
        proc_assign_missing_cc (bool): If True, reassign missed semantic components via :func:`assign_missing_cc`.
        proc_assign_missing_cc_fast (bool): If True, additionally run a fast infect-based assignment first.
        verbose (bool): If True, print verbose progress.

    Returns:
        tuple[NII, NII]: The cleaned ``(whole_vert_nii, seg_nii)`` instance and semantic masks.

    Raises:
        AssertionError: If, with ``proc_assign_missing_cc`` enabled, the instance mask still has more vertebra voxels than the
            semantic mask (which should be impossible).
    """
    subreg_vert_nii = seg_nii.extract_label(vertebra_subreg_labels)

    if proc_assign_missing_cc_fast:
        logger.print("missing cc (fast)")
        missing = subreg_vert_nii.copy()
        missing[whole_vert_nii != 0] = 0
        infect = missing  # .filter_connected_components(max_volume=fast_assinge_missing_cc_size)
        whole_vert_nii = whole_vert_nii.infect(infect, verbose=verbose)
    # make copy where both masks clean each other
    vert_arr_cleaned = whole_vert_nii.get_seg_array()
    subreg_vert_arr = subreg_vert_nii.get_seg_array()
    # if dilation_fill:
    #    vert_arr_cleaned = np_dilate_msk(vert_arr_cleaned, label_ref=vert_labels, mm=5)  # , mask=subreg_vert_arr
    subreg_arr = seg_nii.get_seg_array()

    if proc_assign_missing_cc:
        vert_arr_cleaned, subreg_vert_arr, deletion_map = assign_missing_cc(vert_arr_cleaned, subreg_vert_arr, verbose=verbose)
        subreg_vert_nii.set_array_(subreg_vert_arr)
        vert_arr_cleaned[subreg_vert_arr == 0] = 0
        subreg_arr[deletion_map == 1] = 0

    n_vert_pixels = np_count_nonzero(vert_arr_cleaned)
    n_subreg_vert_pixels = subreg_vert_nii.volumes()[1]
    n_vert_pixel_per_vertebra = n_subreg_vert_pixels / n_vert_bodies
    n_difference_pixels = n_subreg_vert_pixels - n_vert_pixels
    if n_difference_pixels > 0:
        logger.print("n_subreg_vert_px - n_vert_px", n_subreg_vert_pixels - n_vert_pixels, Log_Type.STRANGE)

    n_vert_pixels_rel_diff = round((n_subreg_vert_pixels - n_vert_pixels) / n_vert_pixel_per_vertebra, 3)
    if n_vert_pixels_rel_diff < 0:
        if proc_assign_missing_cc:
            assert n_vert_pixels_rel_diff >= 0, "Less subreg vertebra pixels than in vert mask cannot happen with assign_missing_cc"
        else:
            logger.print(
                f"A volume of {n_vert_pixels_rel_diff} * avg_vertebra_volume in vertebra not matched in semantic mask, set proc_assign_missing_cc=TRUE to circumvent this",
                Log_Type.WARNING,
            )
    elif n_vert_pixels_rel_diff > UNMATCHED_VOLUME_WARN_FRACTION:
        logger.print(f"A volume of {n_vert_pixels_rel_diff} * avg_vertebra_volume in subreg not matched by vertebra mask", Log_Type.WARNING)

    return whole_vert_nii.set_array(vert_arr_cleaned), seg_nii.set_array(subreg_arr)

assign_missing_cc

assign_missing_cc(
    target_arr: ndarray,
    reference_arr: ndarray,
    verbose: bool = False,
    verbose_deletion: bool = False,
    proc_assign_missing_dilate_first: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]

Assign reference-mask connected components that the target mask does not cover to a neighboring target label.

Finds connected components of reference_arr (e.g. the vertebra semantic mask) that have no overlap with target_arr (the instance mask). Optionally dilates the target mask once first to absorb thin gaps. Each remaining component is dilated locally and assigned to the most common neighboring target label; components with no labeled neighbor are deleted from the reference mask and recorded in a deletion map.

Parameters:

Name Type Description Default
target_arr ndarray

Instance label array that components are assigned to.

required
reference_arr ndarray

Reference (semantic) label array whose uncovered components are processed.

required
verbose bool

If True, log each assignment.

False
verbose_deletion bool

If True, log each deletion even when verbose is False.

False
proc_assign_missing_dilate_first bool

If True, dilate the target mask once before searching for uncovered components.

True

Returns:

Type Description
tuple[ndarray, ndarray, ndarray]

tuple[np.ndarray, np.ndarray, np.ndarray]: (target_arr, reference_arr, deletion_map) with the updated instance and reference arrays and a binary map of voxels removed from the reference mask.

Raises:

Type Description
AssertionError

If target_arr and reference_arr do not share the same shape.

Source code in spineps/phase_post.py
def assign_missing_cc(
    target_arr: np.ndarray,
    reference_arr: np.ndarray,
    verbose: bool = False,
    verbose_deletion: bool = False,
    proc_assign_missing_dilate_first: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Assign reference-mask connected components that the target mask does not cover to a neighboring target label.

    Finds connected components of ``reference_arr`` (e.g. the vertebra semantic mask) that have no overlap with ``target_arr``
    (the instance mask). Optionally dilates the target mask once first to absorb thin gaps. Each remaining component is dilated
    locally and assigned to the most common neighboring target label; components with no labeled neighbor are deleted from the
    reference mask and recorded in a deletion map.

    Args:
        target_arr (np.ndarray): Instance label array that components are assigned to.
        reference_arr (np.ndarray): Reference (semantic) label array whose uncovered components are processed.
        verbose (bool): If True, log each assignment.
        verbose_deletion (bool): If True, log each deletion even when ``verbose`` is False.
        proc_assign_missing_dilate_first (bool): If True, dilate the target mask once before searching for uncovered components.

    Returns:
        tuple[np.ndarray, np.ndarray, np.ndarray]: ``(target_arr, reference_arr, deletion_map)`` with the updated instance and
            reference arrays and a binary map of voxels removed from the reference mask.

    Raises:
        AssertionError: If ``target_arr`` and ``reference_arr`` do not share the same shape.
    """
    assert target_arr.shape == reference_arr.shape

    deletion_map = np.zeros_like(reference_arr, dtype=np.uint8)

    # only reference pixels not already covered by target
    subreg_arr_vert_rest = np.where(target_arr == 0, reference_arr, 0)

    label_rest = np_unique(subreg_arr_vert_rest)
    if len(label_rest) == 1 and label_rest[0] == 0:
        logger.print("No CC had to be assigned", Log_Type.OK, verbose=verbose)
        return target_arr, reference_arr, deletion_map

    # dilate once first
    if proc_assign_missing_dilate_first:
        target_arr_ = np_dilate_msk(
            target_arr,
            None,
            n_pixel=2,
            connectivity=1,
            mask=reference_arr,
            use_crop=False,
        )
        subreg_arr_vert_rest = reference_arr.copy()
        subreg_arr_vert_rest[target_arr_ != 0] = 0
        deletion_map = np.zeros(reference_arr.shape)

        label_rest = np_unique(subreg_arr_vert_rest)
        if len(label_rest) == 1 and label_rest[0] == 0:
            logger.print("No CC had to be assigned", Log_Type.OK, verbose=verbose)
            return target_arr_, reference_arr, deletion_map

        target_arr = target_arr_
    # subreg_arr_vert_rest is not hit pixels bei vertebra prediction
    subreg_cc = np_connected_components_per_label(subreg_arr_vert_rest, connectivity=2)

    loop_counts = 0

    for label, subreg_cc_map in subreg_cc.items():
        if label == 0:
            continue

        cc_labels = np_unique_withoutzero(subreg_cc_map)
        loop_counts += len(cc_labels)

        for cc_l in cc_labels:
            cc_mask = subreg_cc_map == cc_l

            cc_bbox = np_bbox_binary(cc_mask, px_dist=2)

            cc_mask_c = cc_mask[cc_bbox]
            vert_arr_c = target_arr[cc_bbox]

            # dilate only local mask
            cc_map_dilated = np_dilate_msk(cc_mask_c.astype(np.uint8), 1, n_pixel=1, connectivity=2).astype(bool)

            # neighborhood labels
            neighbor_labels = vert_arr_c[cc_map_dilated]
            neighbor_labels = neighbor_labels[neighbor_labels != 0]

            if neighbor_labels.size > 0:
                new_label = np.bincount(neighbor_labels).argmax()
                if verbose:
                    logger.print(
                        f"Assign Missing CC {(label, cc_l)} to {new_label}, Location at {cc_bbox}, {cc_mask.sum()}", verbose=verbose
                    )

                vert_arr_c[cc_mask_c] = new_label
                target_arr[cc_bbox] = vert_arr_c

            else:
                logger.print(f"Assign {(label, cc_l)} to EMPTY, Location at {cc_bbox}", verbose=verbose or verbose_deletion)

                reference_arr[cc_bbox][cc_mask_c == 1] = 0
                deletion_map[cc_bbox][cc_mask_c == 1] = 1

    logger.print(f"Assign missing cc: Processed {loop_counts} missed ccs")

    return target_arr, reference_arr, deletion_map

add_ivd_ep_vert_label

add_ivd_ep_vert_label(
    whole_vert_nii: NII, seg_nii: NII, verbose=True
) -> tuple[np.ndarray, np.ndarray]

Attach intervertebral-disc and endplate instance labels and split endplates into superior/inferior.

Reorients both masks to PIR, computes each vertebra corpus center of mass along the inferior-superior axis, then assigns every IVD and endplate connected component to the nearest lower vertebra. IVD voxels are written into the instance array with IVD_LABEL_OFFSET added; endplate voxels with ENDPLATE_LABEL_OFFSET added. Endplates are further divided into inferior/superior plates by iteratively dilating each vertebra into the endplate region. Logs the number of assigned components and restores the original orientation before returning.

Parameters:

Name Type Description Default
whole_vert_nii NII

Vertebra instance segmentation mask.

required
seg_nii NII

Subregion semantic segmentation mask (must contain disc/endplate/corpus labels).

required
verbose bool

If True, print endplate-detection progress.

True

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: (vert_arr, seg_arr) arrays in the original orientation: the instance array with IVD and endplate instance labels added, and the semantic array with endplates split into superior/inferior plates.

Source code in spineps/phase_post.py
def add_ivd_ep_vert_label(whole_vert_nii: NII, seg_nii: NII, verbose=True) -> tuple[np.ndarray, np.ndarray]:
    """Attach intervertebral-disc and endplate instance labels and split endplates into superior/inferior.

    Reorients both masks to PIR, computes each vertebra corpus center of mass along the inferior-superior axis, then assigns
    every IVD and endplate connected component to the nearest lower vertebra. IVD voxels are written into the instance array
    with ``IVD_LABEL_OFFSET`` added; endplate voxels with ``ENDPLATE_LABEL_OFFSET`` added. Endplates are further divided into
    inferior/superior plates by iteratively dilating each vertebra into the endplate region. Logs the number of assigned
    components and restores the original orientation before returning.

    Args:
        whole_vert_nii (NII): Vertebra instance segmentation mask.
        seg_nii (NII): Subregion semantic segmentation mask (must contain disc/endplate/corpus labels).
        verbose (bool): If True, print endplate-detection progress.

    Returns:
        tuple[np.ndarray, np.ndarray]: ``(vert_arr, seg_arr)`` arrays in the original orientation: the instance array with IVD
            and endplate instance labels added, and the semantic array with endplates split into superior/inferior plates.
    """
    # PIR
    orientation = whole_vert_nii.orientation
    vert_t = whole_vert_nii.reorient()
    seg_t = seg_nii.reorient()
    vert_labels = [t for t in vert_t.unique() if t <= 26 or t == 28]  # without zero
    vert_arr = vert_t.get_seg_array()
    subreg_arr = seg_t.get_seg_array()

    coms_vert_dict = {}
    for l in vert_labels:
        vert_l = vert_arr.copy()
        vert_l[vert_l != l] = 0
        vert_l[subreg_arr != 49] = 0  # com of corpus region
        vert_l[vert_l != 0] = 1
        try:
            coms_vert_dict[l] = np_center_of_mass(vert_l)[1][1]  # center_of_mass(vert_l)[1]
        except Exception:
            coms_vert_dict[l] = 0

    coms_vert_y = list(coms_vert_dict.values())
    coms_vert_labels = list(coms_vert_dict.keys())

    n_ivds = 0
    n_ivd_unique = 0
    if Location.Vertebra_Disc.value in seg_t.unique():
        # Map IVDS
        subreg_cc = seg_t.get_connected_components(labels=Location.Vertebra_Disc.value)
        subreg_cc_n = len(subreg_cc.unique())
        subreg_cc = subreg_cc.get_seg_array()
        cc_labelset = list(range(1, subreg_cc_n + 1))
        mapping_cc_to_vert_label = {}

        coms_ivd_dict = {}
        for c in cc_labelset:
            if c == 0:
                continue
            com_y = np_center_of_mass(subreg_cc == c)[1][1]  # center_of_mass(c_l)[1]

            if com_y < min(coms_vert_y):
                label = min(coms_vert_labels) - 1
            else:
                nearest_lower = find_nearest_lower(coms_vert_y, com_y)
                label = next(i for i in coms_vert_dict if coms_vert_dict[i] == nearest_lower)
            coms_ivd_dict[label] = com_y
            mapping_cc_to_vert_label[c] = label
            n_ivds += 1

        # find which vert got how many ivd CCs
        to_mapped_labels = list(mapping_cc_to_vert_label.values())
        for l in vert_labels:
            if l not in to_mapped_labels and l != 1:
                logger.print(f"Vertebra {v_idx2name[l]} got no IVD component assigned", Log_Type.STRANGE)
            count = to_mapped_labels.count(l)
            if count > 1:
                logger.print(f"Vertebra {v_idx2name[l]} got {count} IVD components assigned", Log_Type.STRANGE)

        subreg_ivd = subreg_cc.copy()
        n_ivd_unique = len(np.unique(to_mapped_labels))
        subreg_ivd = np_map_labels(subreg_ivd, label_map=mapping_cc_to_vert_label)
        subreg_ivd += IVD_LABEL_OFFSET
        subreg_ivd[subreg_ivd == IVD_LABEL_OFFSET] = 0
        vert_arr[subreg_arr == Location.Vertebra_Disc.value] = subreg_ivd[subreg_arr == Location.Vertebra_Disc.value]
    n_eps = 0
    n_eps_unique = 0
    if Location.Endplate.value in seg_t.unique():
        # MAP Endplate
        ep_cc = seg_t.get_connected_components(labels=Location.Endplate.value)
        ep_cc_n = len(ep_cc.unique())
        ep_cc = ep_cc.get_seg_array()
        cc_ep_labelset = list(range(1, ep_cc_n + 1))
        mapping_ep_cc_to_vert_label = {}
        coms_ivd_dict = {}
        for c in cc_ep_labelset:
            if c == 0:
                continue
            com_y = np_center_of_mass(ep_cc == c)[1][1]  # center_of_mass(c_l)[1]
            nearest_lower = find_nearest_lower(coms_vert_y, com_y)
            label = next(i for i in coms_vert_dict if coms_vert_dict[i] == nearest_lower)
            mapping_ep_cc_to_vert_label[c] = label
            n_eps += 1

        subreg_ep = ep_cc.copy()
        n_eps_unique = len(np.unique(list(mapping_ep_cc_to_vert_label.values())))
        subreg_ep = np_map_labels(subreg_ep, label_map=mapping_ep_cc_to_vert_label)
        subreg_ep += ENDPLATE_LABEL_OFFSET
        subreg_ep[subreg_ep == ENDPLATE_LABEL_OFFSET] = 0
        vert_arr[subreg_arr == Location.Endplate.value] = subreg_ep[subreg_arr == Location.Endplate.value]
        vert_t.set_array_(vert_arr)

        # divide into upper and lower endplate
        out = seg_t * 0
        pref = 1
        old_vol = -1
        # seg_t and vert_t are not modified in this loop, so compute these invariants once.
        endplate_nii = seg_t.extract_label(Location.Endplate.value)
        total = endplate_nii.sum()
        vert_labels_to_split = vert_t.unique()
        for dil in range(1, MAX_ENDPLATE_DILATION):
            curr = out.extract_label([Location.Vertebral_Body_Endplate_Inferior.value, Location.Vertebral_Body_Endplate_Superior.value])
            new_vol = curr.sum()
            logger.print(rf"{new_vol / total * 100:.2f}% endplates detected", end="\r") if verbose else None
            if old_vol == new_vol and old_vol != 0:
                break
            old_vol = new_vol
            if total == new_vol:
                logger.print("Found all Endplates                                      ")
                break
            for i in vert_labels_to_split:
                if i >= INSTANCE_LABEL_LIMIT:
                    break
                curr = out.extract_label([Location.Vertebral_Body_Endplate_Inferior.value, Location.Vertebral_Body_Endplate_Superior.value])
                v = vert_t.extract_label(i).dilate_msk(dil, verbose=False)
                end = endplate_nii * v
                end *= -curr + 1  # type: ignore
                plates = vert_t * end
                plates.map_labels_(
                    {
                        i + ENDPLATE_LABEL_OFFSET: Location.Vertebral_Body_Endplate_Inferior.value,
                        pref + ENDPLATE_LABEL_OFFSET: Location.Vertebral_Body_Endplate_Superior.value,
                    },
                    verbose=False,
                )
                out += plates
                pref = i
        curr = out.extract_label([Location.Vertebral_Body_Endplate_Inferior.value, Location.Vertebral_Body_Endplate_Superior.value])
        end = seg_t.extract_label(Location.Endplate.value)
        end *= -curr + 1
        # end += end.dilate_msk(3)
        out += end * Location.Endplate.value
        seg_t = out.extract_label(
            [Location.Vertebral_Body_Endplate_Inferior.value, Location.Vertebral_Body_Endplate_Superior.value, Location.Endplate.value]
        )

    logger.print(f"Labeled {n_ivds} IVDs ({n_ivd_unique} unique), and {n_eps} Endplates ({n_eps_unique} unique)")
    return vert_t.set_array_(vert_arr).reorient_(orientation).get_seg_array(), seg_t.reorient_(orientation).get_seg_array()

find_nearest_lower

find_nearest_lower(seq, x) -> float

Return the largest element of seq strictly smaller than x, or the minimum if none exists.

Parameters:

Name Type Description Default
seq Sequence[float]

Values to search.

required
x float

Reference value.

required

Returns:

Name Type Description
float float

The greatest element below x, or min(seq) when no element is below x.

Source code in spineps/phase_post.py
def find_nearest_lower(seq, x) -> float:
    """Return the largest element of ``seq`` strictly smaller than ``x``, or the minimum if none exists.

    Args:
        seq (Sequence[float]): Values to search.
        x (float): Reference value.

    Returns:
        float: The greatest element below ``x``, or ``min(seq)`` when no element is below ``x``.
    """
    values_lower = [item for item in seq if item < x]
    if len(values_lower) == 0:
        return min(seq)
    return max(values_lower)

label_instance_top_to_bottom

label_instance_top_to_bottom(
    vert_nii: NII, labeling_offset: int = 0
) -> tuple[NII, np.ndarray]

Relabel vertebra instances consecutively from top to bottom by center-of-mass height.

Reorients to PIR, sorts the instances by their center of mass along the inferior-superior axis, and assigns consecutive labels (1 + labeling_offset upward) from top to bottom, then restores the original orientation.

Parameters:

Name Type Description Default
vert_nii NII

Vertebra instance segmentation mask (modified in place).

required
labeling_offset int

Offset added to the consecutive labels.

0

Returns:

Type Description
tuple[NII, ndarray]

tuple[NII, np.ndarray]: The relabeled instance mask and its array of unique labels.

Source code in spineps/phase_post.py
def label_instance_top_to_bottom(vert_nii: NII, labeling_offset: int = 0) -> tuple[NII, np.ndarray]:
    """Relabel vertebra instances consecutively from top to bottom by center-of-mass height.

    Reorients to PIR, sorts the instances by their center of mass along the inferior-superior axis, and assigns consecutive
    labels (``1 + labeling_offset`` upward) from top to bottom, then restores the original orientation.

    Args:
        vert_nii (NII): Vertebra instance segmentation mask (modified in place).
        labeling_offset (int): Offset added to the consecutive labels.

    Returns:
        tuple[NII, np.ndarray]: The relabeled instance mask and its array of unique labels.
    """
    ori = vert_nii.orientation
    vert_nii.reorient_()
    vert_arr = vert_nii.get_seg_array()
    com_i = np_center_of_mass(vert_arr)
    comb_l = list(zip_strict(com_i.keys(), com_i.values()))
    comb_l.sort(key=lambda a: a[1][1])  # PIR
    com_map = {comb_l[idx][0]: idx + 1 + labeling_offset for idx in range(len(comb_l))}

    vert_nii.map_labels_(com_map, verbose=False)
    vert_nii.reorient_(ori)
    return vert_nii, vert_nii.unique()

assign_vertebra_inconsistency

assign_vertebra_inconsistency(
    vert_nii: NII,
    seg_nii: NII,
    locations=(
        Location.Superior_Articular_Left,
        Location.Superior_Articular_Right,
        Location.Inferior_Articular_Left,
        Location.Inferior_Articular_Right,
    ),
) -> None

Reassign articular-process components to the vertebra instance they most overlap with.

For each given articular subregion location, finds its connected components in the semantic mask and, for each component, reassigns its instance label to the vertebra whose overlap volume dominates the second-largest by ARTICULAR_DOMINANCE_RATIO. Updates vert_nii in place.

Parameters:

Name Type Description Default
vert_nii NII

Vertebra instance segmentation mask (modified in place).

required
seg_nii NII

Subregion semantic segmentation mask.

required
locations tuple[Location, ...]

Articular subregion locations to reconcile.

(Superior_Articular_Left, Superior_Articular_Right, Inferior_Articular_Left, Inferior_Articular_Right)

Returns:

Name Type Description
None None

vert_nii is modified in place.

Source code in spineps/phase_post.py
def assign_vertebra_inconsistency(
    vert_nii: NII,
    seg_nii: NII,
    locations=(
        Location.Superior_Articular_Left,
        Location.Superior_Articular_Right,
        Location.Inferior_Articular_Left,
        Location.Inferior_Articular_Right,
    ),
) -> None:
    """Reassign articular-process components to the vertebra instance they most overlap with.

    For each given articular subregion location, finds its connected components in the semantic mask and, for each component,
    reassigns its instance label to the vertebra whose overlap volume dominates the second-largest by ``ARTICULAR_DOMINANCE_RATIO``.
    Updates ``vert_nii`` in place.

    Args:
        vert_nii (NII): Vertebra instance segmentation mask (modified in place).
        seg_nii (NII): Subregion semantic segmentation mask.
        locations (tuple[Location, ...]): Articular subregion locations to reconcile.

    Returns:
        None: ``vert_nii`` is modified in place.
    """
    seg_nii.assert_affine(shape=vert_nii.shape)
    seg_arr = seg_nii.get_seg_array()
    vert_arr = vert_nii.get_seg_array()

    # assign inconsistent substructures
    for loc in locations:
        value = loc.value

        subreg_l = np_extract_label(seg_arr, value, inplace=False)  # type:ignore
        try:
            subreg_cc, _ = np_connected_components(subreg_l, label_ref=1)
        except AssertionError as e:
            print(f"Got error {e}, skip")
            break
        cc_labels = np_unique(subreg_cc)

        for ccl in cc_labels:
            if ccl == 0:
                continue
            cc_map = np_extract_label(subreg_cc, ccl, inplace=False)
            vert_arr_cc = vert_arr.copy()
            vert_arr_cc += 1
            vert_arr_cc[cc_map == 0] = 0
            gt_volume = np_volume(vert_arr_cc)
            k_keys_sorted = heapq.nlargest(2, gt_volume, key=gt_volume.__getitem__)

            if len(k_keys_sorted) == 1:
                continue
            biggest_volume = (k_keys_sorted[0], gt_volume[k_keys_sorted[0]])
            second_volume = (k_keys_sorted[1], gt_volume[k_keys_sorted[1]])

            # print(biggest_volume, second_volume)

            if biggest_volume[1] * ARTICULAR_DOMINANCE_RATIO > second_volume[1]:
                to_label = biggest_volume[0] - 1  # int(list(gt_volume.keys())[argmax] - 1)

                vert_arr[cc_map == 1] = to_label
                logger.print(
                    f"set cc to {to_label}, with volume decision {gt_volume}, based on {biggest_volume}, {second_volume}", verbose=False
                )

        vert_nii.set_array_(vert_arr)

detect_and_solve_merged_vertebra

detect_and_solve_merged_vertebra(
    seg_nii: NII, vert_nii: NII
) -> tuple[NII, NII]

Detect and merge a vertebra (typically C2) that was split into two stacked instances.

Builds a height-sorted list of IVD components and vertebra instances. If the two topmost entries are both vertebrae, the upper one is significantly smaller (below MERGED_VERTEBRA_SIZE_RATIO of the other), and the two masks touch over more than MERGED_VERTEBRA_MIN_CONTACT_MM2 of area, the smaller instance is merged into the larger one in vert_nii.

Parameters:

Name Type Description Default
seg_nii NII

Subregion semantic segmentation mask.

required
vert_nii NII

Vertebra instance segmentation mask (modified in place when a merge occurs).

required

Returns:

Type Description
tuple[NII, NII]

tuple[NII, NII]: The (seg_nii, vert_nii) masks (vert_nii possibly with two instances merged).

Source code in spineps/phase_post.py
def detect_and_solve_merged_vertebra(seg_nii: NII, vert_nii: NII) -> tuple[NII, NII]:
    """Detect and merge a vertebra (typically C2) that was split into two stacked instances.

    Builds a height-sorted list of IVD components and vertebra instances. If the two topmost entries are both vertebrae, the
    upper one is significantly smaller (below ``MERGED_VERTEBRA_SIZE_RATIO`` of the other), and the two masks touch over more
    than ``MERGED_VERTEBRA_MIN_CONTACT_MM2`` of area, the smaller instance is merged into the larger one in ``vert_nii``.

    Args:
        seg_nii (NII): Subregion semantic segmentation mask.
        vert_nii (NII): Vertebra instance segmentation mask (modified in place when a merge occurs).

    Returns:
        tuple[NII, NII]: The ``(seg_nii, vert_nii)`` masks (``vert_nii`` possibly with two instances merged).
    """
    seg_sem = seg_nii.map_labels({Location.Endplate.value: Location.Vertebra_Disc.value}, verbose=False)
    # get all ivd CCs from seg_sem

    stats = {}
    # Map IVDS
    subreg_cc: NII = seg_sem.get_connected_components(labels=Location.Vertebra_Disc.value)
    subreg_cc += 100

    coms = subreg_cc.center_of_masses()
    volumes = subreg_cc.volumes()
    stats = {i: (g[1], True, volumes[i]) for i, g in coms.items()}

    corpus_nii = seg_sem.extract_label([Location.Vertebra_Corpus_border.value, Location.Arcus_Vertebrae.value]) * vert_nii
    vert_coms = corpus_nii.center_of_masses()
    vert_volumes = vert_nii.volumes()

    for i, g in vert_coms.items():
        stats[i] = (g[1], False, vert_volumes[i])

    stats_by_height = dict(sorted(stats.items(), key=lambda x: x[1][0]))
    stats_by_height_keys = list(stats_by_height.keys())

    # detect C2 split into two components
    first_key, second_key = stats_by_height_keys[0], stats_by_height_keys[1]
    first_stats, second_stats = stats_by_height[first_key], stats_by_height[second_key]
    if first_stats[1] is False and second_stats[1] is False:  # noqa: SIM102
        # both vertebra
        if first_stats[2] < MERGED_VERTEBRA_SIZE_RATIO * second_stats[2]:
            # first is significantly smaller than second and they are close in height
            # how many pixels are touching
            vert_firsttwo_arr = vert_nii.extract_label(first_key).get_seg_array()
            vert_firsttwo_arr2 = vert_nii.extract_label(second_key).get_seg_array()
            vert_firsttwo_arr += vert_firsttwo_arr2 + 1
            contacts = np_contacts(vert_firsttwo_arr, connectivity=3)
            if contacts[(1, 2)] > isotropic_area_to_voxels(MERGED_VERTEBRA_MIN_CONTACT_MM2, vert_nii.zoom):
                logger.print("Found first two instance weird, will merge", Log_Type.STRANGE)
                vert_nii.map_labels_({first_key: second_key}, verbose=False)

    return seg_nii, vert_nii