Skip to content

Commit

Permalink
Move iteration out
Browse files Browse the repository at this point in the history
  • Loading branch information
tanghaibao committed Apr 12, 2024
1 parent f7e635a commit 52572d3
Showing 1 changed file with 48 additions and 41 deletions.
89 changes: 48 additions & 41 deletions jcvi/apps/pedigree.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from collections import Counter
from dataclasses import dataclass
from random import choice, sample
from typing import Optional, Tuple
from random import sample
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -60,44 +60,53 @@ class GenotypeCollection(dict):
Store genotypes for each sample.
"""

def add(self, s: str, ploidy: int, N: int):
def add(self, s: str, ploidy: int):
"""
Add genotypes for a fixed sample (usually terminal).
"""
self[s] = [[f"{s}{i:02d}" for i in range(ploidy)] for _ in range(N)]
self[s] = [f"{s}{i:02d}" for i in range(ploidy)]

def cross(self, s: str, dad: str, mom: str, ploidy: int, N: int):
def cross(self, s: str, dad: str, mom: str, ploidy: int):
"""
Cross two samples to generate genotypes for a new sample.
"""
dad_genotypes = self[dad]
mom_genotypes = self[mom]
dad_genotype = self[dad]
mom_genotype = self[mom]
gamete_ploidy = ploidy // 2
sample_genotypes = []
for _ in range(N):
dad_genotype = choice(dad_genotypes)
mom_genotype = choice(mom_genotypes)
dad_gamete = sample(dad_genotype, gamete_ploidy)
mom_gamete = sample(mom_genotype, gamete_ploidy)
sample_genotypes.append(sorted(dad_gamete + mom_gamete))
self[s] = sample_genotypes

def inbreeding_coef(self, s: str) -> Tuple[float, float]:
dad_gamete = sample(dad_genotype, gamete_ploidy)
mom_gamete = sample(mom_genotype, gamete_ploidy)
sample_genotype = sorted(dad_gamete + mom_gamete)
self[s] = sample_genotype

def inbreeding_coef(self, s: str) -> float:
"""
Calculate inbreeding coefficient for a sample.
"""
genotypes = self[s]
results = []
for genotype in genotypes:
ploidy = len(genotype)
pairs = ploidy * (ploidy - 1) // 2
counter = Counter(genotype)
collisions = 0
for count in counter.values():
collisions += count * (count - 1) // 2
results.append(collisions / pairs)
results = np.array(results)
return results.mean(), results.std()
genotype = self[s]
ploidy = len(genotype)
pairs = ploidy * (ploidy - 1) // 2
counter = Counter(genotype)
collisions = 0
for count in counter.values():
collisions += count * (count - 1) // 2
return collisions / pairs


def simulate_one_iteration(ped: Pedigree, ploidy: int) -> GenotypeCollection:
"""
Simulate one iteration of genotypes.
"""
genotypes = GenotypeCollection()
while len(genotypes) < len(ped):
for s in ped:
if ped[s].is_terminal:
genotypes.add(s, ploidy=ploidy)
else:
dad, mom = ped[s].dad, ped[s].mom
if dad not in genotypes or mom not in genotypes:
continue
genotypes.cross(s, dad, mom, ploidy=ploidy)
return genotypes


def inbreeding(args):
Expand All @@ -118,19 +127,17 @@ def inbreeding(args):
ploidy = opts.ploidy
N = opts.N
ped = Pedigree(pedfile)
genotypes = GenotypeCollection()
while len(genotypes) < len(ped):
for s in ped:
if ped[s].is_terminal:
genotypes.add(s, ploidy=ploidy, N=N)
else:
dad, mom = ped[s].dad, ped[s].mom
if dad not in genotypes or mom not in genotypes:
continue
genotypes.cross(s, dad, mom, ploidy=ploidy, N=N)
all_collections = []
for _ in range(N):
genotypes = simulate_one_iteration(ped, ploidy)
all_collections.append(genotypes)
for s in ped:
mean, std = genotypes.inbreeding_coef(s)
print(s, mean, std)
inbreeding_coefs = [
genotypes.inbreeding_coef(s) for genotypes in all_collections
]
mean_inbreeding = np.mean(inbreeding_coefs)
std_inbreeding = np.std(inbreeding_coefs)
print(f"{s}\t{mean_inbreeding:.4f}\t{std_inbreeding:.4f}")


def main():
Expand Down

0 comments on commit 52572d3

Please sign in to comment.