Correct implementation of Wright–Malécot inbreeding coefficient using shortest paths (Python/Django)
I am implementing the Wright inbreeding coefficient (F) for individuals in a pigeon pedigree (Django application, relational database). The current implementation builds the pedigree using a breadth-first search (BFS) up to a fixed number of generations and computes F using the classical formula:
𝐹=∑[(1/2)^(𝑛1+𝑛2+1)] (1+𝐹𝐴)
where:
- n1 is the number of generations between the sire and the common ancestor,
- n2 is the number of generations between the dam and the common ancestor,
- 𝐹𝐴 is the inbreeding coefficient of the common ancestor.
To reduce complexity, the algorithm intentionally:
- considers only the shortest path from sire and dam to each common ancestor,
- does not enumerate all possible loops in the pedigree.
Although this approach works for simple pedigrees, the computed F values are incorrect for more complex cases involving:
- multiple independent loops,
- repeated ancestors through different paths,
- deeper or overlapping inbreeding structures.
Specifically:
- the algorithm underestimates F when a common ancestor appears via more than one valid loop,
- contributions from additional paths are ignored,
- recursive calculation of 𝐹𝐴 propagates the same limitation.
I am looking for clarification on whether:
- the Wright–Malécot coefficient can be correctly computed using shortest paths only, or
- all valid ancestral loops must be explicitly enumerated (or otherwise accounted for),
- and what algorithmic approach is recommended for a correct and computationally feasible implementation in this context.
My code below.
#inbreeding.py
from collections import deque
from typing import Dict, Tuple, Optional
from decimal import Decimal
import logging
logger = logging.getLogger(__name__)
class InbreedingCalculator:
"""
Correct Wright–Malécot Inbreeding Coefficient Calculator.
Uses SIMPLE LOOPS, not all paths.
"""
def __init__(self, max_generations: int = 10):
self.max_generations = max_generations
self._fa_cache: Dict[int, float] = {}
# ------------------------------------------------------------
# PUBLIC API
# ------------------------------------------------------------
def calculate_coefficient(self, pigeon_id, sire_id, dam_id) -> Decimal:
if not sire_id or not dam_id:
return Decimal("0.0000")
try:
from .models import Pigeon
pedigree = self._build_pedigree(sire_id, dam_id, Pigeon)
f = self._compute_f(sire_id, dam_id, pedigree)
return Decimal(str(round(f, 4)))
except Exception as e:
logger.error(f"Error calculating F for pigeon {pigeon_id}: {e}", exc_info=True)
return Decimal("0.0000")
# ------------------------------------------------------------
# BUILD PEDIGREE (BFS, batched)
# ------------------------------------------------------------
def _build_pedigree(self, sire_id, dam_id, Pigeon):
pedigree = {}
visited = set()
queue = deque([(sire_id, 0), (dam_id, 0)])
while queue:
pid, depth = queue.popleft()
if pid in visited or depth > self.max_generations:
continue
visited.add(pid)
try:
p = Pigeon.objects.only("id", "sire_id", "dam_id").get(pk=pid)
s, d = p.sire_id, p.dam_id
except:
s, d = None, None
pedigree[pid] = (s, d)
if s:
queue.append((s, depth + 1))
if d:
queue.append((d, depth + 1))
return pedigree
# ------------------------------------------------------------
# SIMPLE LOOP METHOD (Wright–Malécot)
# ------------------------------------------------------------
def _compute_f(self, sire_id, dam_id, pedigree):
ancestors_sire = self._shortest_paths(sire_id, pedigree)
ancestors_dam = self._shortest_paths(dam_id, pedigree)
common = set(ancestors_sire.keys()) & set(ancestors_dam.keys())
if not common:
return 0.0
total = 0.0
for A in common:
n1 = ancestors_sire[A]
n2 = ancestors_dam[A]
Fa = self._get_fa(A, pedigree)
contrib = (0.5 ** (n1 + n2 + 1)) * (1 + Fa)
total += contrib
return total
# ------------------------------------------------------------
# SHORTEST PATHS (NOT all paths!)
# ------------------------------------------------------------
def _shortest_paths(self, start_id, pedigree):
"""
Returns: {ancestor_id: distance}
"""
dist = {start_id: 0}
queue = deque([start_id])
while queue:
current = queue.popleft()
depth = dist[current]
if depth >= self.max_generations:
continue
if current not in pedigree:
continue
s, d = pedigree[current]
for parent in (s, d):
if parent and parent not in dist:
dist[parent] = depth + 1
queue.append(parent)
return dist
# ------------------------------------------------------------
# FA CALCULATION (recursive)
# ------------------------------------------------------------
def _get_fa(self, ancestor_id, pedigree):
if ancestor_id in self._fa_cache:
return self._fa_cache[ancestor_id]
s, d = pedigree.get(ancestor_id, (None, None))
if not s or not d:
self._fa_cache[ancestor_id] = 0.0
return 0.0
Fa = self._compute_f(s, d, pedigree)
self._fa_cache[ancestor_id] = Fa
return Fa
def clear_cache(self):
self._fa_cache.clear()
_calculator = InbreedingCalculator(max_generations=10)
def get_calculator():
return _calculator