Correct implementation of Wright–Malécot inbreeding coefficient using shortest paths (Python/Django)

I am implementing the Wright inbreeding coefficient (F) for individuals in a pigeon pedigree (Django application, relational database). The current implementation builds the pedigree using a breadth-first search (BFS) up to a fixed number of generations and computes F using the classical formula:

𝐹=∑[(1/2)^(𝑛1+𝑛2+1)] (1+𝐹𝐴)

where:

  • n1 is the number of generations between the sire and the common ancestor,
  • n2 is the number of generations between the dam and the common ancestor,
  • 𝐹𝐴 is the inbreeding coefficient of the common ancestor.

To reduce complexity, the algorithm intentionally:

  • considers only the shortest path from sire and dam to each common ancestor,
  • does not enumerate all possible loops in the pedigree.

Although this approach works for simple pedigrees, the computed F values are incorrect for more complex cases involving:

  • multiple independent loops,
  • repeated ancestors through different paths,
  • deeper or overlapping inbreeding structures.

Specifically:

  • the algorithm underestimates F when a common ancestor appears via more than one valid loop,
  • contributions from additional paths are ignored,
  • recursive calculation of 𝐹𝐴 propagates the same limitation.

I am looking for clarification on whether:

  • the Wright–Malécot coefficient can be correctly computed using shortest paths only, or
  • all valid ancestral loops must be explicitly enumerated (or otherwise accounted for),
  • and what algorithmic approach is recommended for a correct and computationally feasible implementation in this context.

My code below.

#inbreeding.py
from collections import deque
from typing import Dict, Tuple, Optional
from decimal import Decimal
import logging

logger = logging.getLogger(__name__)


class InbreedingCalculator:
    """
    Correct Wright–Malécot Inbreeding Coefficient Calculator.
    Uses SIMPLE LOOPS, not all paths.
    """

    def __init__(self, max_generations: int = 10):
        self.max_generations = max_generations
        self._fa_cache: Dict[int, float] = {}

    # ------------------------------------------------------------
    # PUBLIC API
    # ------------------------------------------------------------
    def calculate_coefficient(self, pigeon_id, sire_id, dam_id) -> Decimal:
        if not sire_id or not dam_id:
            return Decimal("0.0000")

        try:
            from .models import Pigeon
            pedigree = self._build_pedigree(sire_id, dam_id, Pigeon)
            f = self._compute_f(sire_id, dam_id, pedigree)
            return Decimal(str(round(f, 4)))
        except Exception as e:
            logger.error(f"Error calculating F for pigeon {pigeon_id}: {e}", exc_info=True)
            return Decimal("0.0000")

    # ------------------------------------------------------------
    # BUILD PEDIGREE (BFS, batched)
    # ------------------------------------------------------------
    def _build_pedigree(self, sire_id, dam_id, Pigeon):
        pedigree = {}
        visited = set()
        queue = deque([(sire_id, 0), (dam_id, 0)])

        while queue:
            pid, depth = queue.popleft()
            if pid in visited or depth > self.max_generations:
                continue

            visited.add(pid)

            try:
                p = Pigeon.objects.only("id", "sire_id", "dam_id").get(pk=pid)
                s, d = p.sire_id, p.dam_id
            except:
                s, d = None, None

            pedigree[pid] = (s, d)

            if s:
                queue.append((s, depth + 1))
            if d:
                queue.append((d, depth + 1))

        return pedigree

    # ------------------------------------------------------------
    # SIMPLE LOOP METHOD (Wright–Malécot)
    # ------------------------------------------------------------
    def _compute_f(self, sire_id, dam_id, pedigree):
        ancestors_sire = self._shortest_paths(sire_id, pedigree)
        ancestors_dam = self._shortest_paths(dam_id, pedigree)

        common = set(ancestors_sire.keys()) & set(ancestors_dam.keys())
        if not common:
            return 0.0

        total = 0.0

        for A in common:
            n1 = ancestors_sire[A]
            n2 = ancestors_dam[A]
            Fa = self._get_fa(A, pedigree)

            contrib = (0.5 ** (n1 + n2 + 1)) * (1 + Fa)
            total += contrib

        return total

    # ------------------------------------------------------------
    # SHORTEST PATHS (NOT all paths!)
    # ------------------------------------------------------------
    def _shortest_paths(self, start_id, pedigree):
        """
        Returns: {ancestor_id: distance}
        """
        dist = {start_id: 0}
        queue = deque([start_id])

        while queue:
            current = queue.popleft()
            depth = dist[current]

            if depth >= self.max_generations:
                continue

            if current not in pedigree:
                continue

            s, d = pedigree[current]

            for parent in (s, d):
                if parent and parent not in dist:
                    dist[parent] = depth + 1
                    queue.append(parent)

        return dist

    # ------------------------------------------------------------
    # FA CALCULATION (recursive)
    # ------------------------------------------------------------
    def _get_fa(self, ancestor_id, pedigree):
        if ancestor_id in self._fa_cache:
            return self._fa_cache[ancestor_id]

        s, d = pedigree.get(ancestor_id, (None, None))

        if not s or not d:
            self._fa_cache[ancestor_id] = 0.0
            return 0.0

        Fa = self._compute_f(s, d, pedigree)
        self._fa_cache[ancestor_id] = Fa
        return Fa

    def clear_cache(self):
        self._fa_cache.clear()


_calculator = InbreedingCalculator(max_generations=10)


def get_calculator():
    return _calculator

classical formula requires summing over all independent ancestral loops connecting sire and dam through each common ancestor. If a common ancestor appears via multiple paths, each path contributes separately. Limiting to shortest paths systematically undercounts F, especially in complex or overlapping pedigrees.

Recursive computation of FA with the same restriction propagates the underestimation.

Recommended approach:
Use a standard pedigree algorithm that accounts for all loops implicitly, such as:

. Tabular (numerator relationship matrix) method

. Henderson’s recursive algorithm

. Meuwissen–Luo algorithm (efficient for large pedigrees)

These methods avoid explicit path enumeration but correctly compute F in all cases.

Bottom line:
Your current BFS/shortest-path implementation is a valid approximation for simple pedigrees, but for mathematically correct F values in general pedigrees, a matrix-based or recursive relationship algorithm is required.

Вернуться на верх