Correct implementation of Wright–Malécot inbreeding coefficient using shortest paths (Python/Django)

I am implementing the Wright inbreeding coefficient (F) for individuals in a pigeon pedigree (Django application, relational database). The current implementation builds the pedigree using a breadth-first search (BFS) up to a fixed number of generations and computes F using the classical formula:

𝐹=∑[(1/2)^(𝑛1+𝑛2+1)] (1+𝐹𝐴)

where:

  • n1 is the number of generations between the sire and the common ancestor,
  • n2 is the number of generations between the dam and the common ancestor,
  • 𝐹𝐴 is the inbreeding coefficient of the common ancestor.

To reduce complexity, the algorithm intentionally:

  • considers only the shortest path from sire and dam to each common ancestor,
  • does not enumerate all possible loops in the pedigree.

Although this approach works for simple pedigrees, the computed F values are incorrect for more complex cases involving:

  • multiple independent loops,
  • repeated ancestors through different paths,
  • deeper or overlapping inbreeding structures.

Specifically:

  • the algorithm underestimates F when a common ancestor appears via more than one valid loop,
  • contributions from additional paths are ignored,
  • recursive calculation of 𝐹𝐴 propagates the same limitation.

I am looking for clarification on whether:

  • the Wright–Malécot coefficient can be correctly computed using shortest paths only, or
  • all valid ancestral loops must be explicitly enumerated (or otherwise accounted for),
  • and what algorithmic approach is recommended for a correct and computationally feasible implementation in this context.

My code below.

#inbreeding.py
from collections import deque
from typing import Dict, Tuple, Optional
from decimal import Decimal
import logging

logger = logging.getLogger(__name__)


class InbreedingCalculator:
    """
    Correct Wright–Malécot Inbreeding Coefficient Calculator.
    Uses SIMPLE LOOPS, not all paths.
    """

    def __init__(self, max_generations: int = 10):
        self.max_generations = max_generations
        self._fa_cache: Dict[int, float] = {}

    # ------------------------------------------------------------
    # PUBLIC API
    # ------------------------------------------------------------
    def calculate_coefficient(self, pigeon_id, sire_id, dam_id) -> Decimal:
        if not sire_id or not dam_id:
            return Decimal("0.0000")

        try:
            from .models import Pigeon
            pedigree = self._build_pedigree(sire_id, dam_id, Pigeon)
            f = self._compute_f(sire_id, dam_id, pedigree)
            return Decimal(str(round(f, 4)))
        except Exception as e:
            logger.error(f"Error calculating F for pigeon {pigeon_id}: {e}", exc_info=True)
            return Decimal("0.0000")

    # ------------------------------------------------------------
    # BUILD PEDIGREE (BFS, batched)
    # ------------------------------------------------------------
    def _build_pedigree(self, sire_id, dam_id, Pigeon):
        pedigree = {}
        visited = set()
        queue = deque([(sire_id, 0), (dam_id, 0)])

        while queue:
            pid, depth = queue.popleft()
            if pid in visited or depth > self.max_generations:
                continue

            visited.add(pid)

            try:
                p = Pigeon.objects.only("id", "sire_id", "dam_id").get(pk=pid)
                s, d = p.sire_id, p.dam_id
            except:
                s, d = None, None

            pedigree[pid] = (s, d)

            if s:
                queue.append((s, depth + 1))
            if d:
                queue.append((d, depth + 1))

        return pedigree

    # ------------------------------------------------------------
    # SIMPLE LOOP METHOD (Wright–Malécot)
    # ------------------------------------------------------------
    def _compute_f(self, sire_id, dam_id, pedigree):
        ancestors_sire = self._shortest_paths(sire_id, pedigree)
        ancestors_dam = self._shortest_paths(dam_id, pedigree)

        common = set(ancestors_sire.keys()) & set(ancestors_dam.keys())
        if not common:
            return 0.0

        total = 0.0

        for A in common:
            n1 = ancestors_sire[A]
            n2 = ancestors_dam[A]
            Fa = self._get_fa(A, pedigree)

            contrib = (0.5 ** (n1 + n2 + 1)) * (1 + Fa)
            total += contrib

        return total

    # ------------------------------------------------------------
    # SHORTEST PATHS (NOT all paths!)
    # ------------------------------------------------------------
    def _shortest_paths(self, start_id, pedigree):
        """
        Returns: {ancestor_id: distance}
        """
        dist = {start_id: 0}
        queue = deque([start_id])

        while queue:
            current = queue.popleft()
            depth = dist[current]

            if depth >= self.max_generations:
                continue

            if current not in pedigree:
                continue

            s, d = pedigree[current]

            for parent in (s, d):
                if parent and parent not in dist:
                    dist[parent] = depth + 1
                    queue.append(parent)

        return dist

    # ------------------------------------------------------------
    # FA CALCULATION (recursive)
    # ------------------------------------------------------------
    def _get_fa(self, ancestor_id, pedigree):
        if ancestor_id in self._fa_cache:
            return self._fa_cache[ancestor_id]

        s, d = pedigree.get(ancestor_id, (None, None))

        if not s or not d:
            self._fa_cache[ancestor_id] = 0.0
            return 0.0

        Fa = self._compute_f(s, d, pedigree)
        self._fa_cache[ancestor_id] = Fa
        return Fa

    def clear_cache(self):
        self._fa_cache.clear()


_calculator = InbreedingCalculator(max_generations=10)


def get_calculator():
    return _calculator
Вернуться на верх