Source code for fairmd.lipids.quality

"""
Accumulate functions required for major evaluate_quality script.

TODO: add proper tests
"""

import json
import os
import warnings
from abc import ABC, abstractmethod

import numpy as np
import numpy.typing as npt
import scipy.stats

import fairmd.lipids.analib.formfactor as ff
from fairmd.lipids import FMDL_SIMU_PATH
from fairmd.lipids.api import get_FF, get_OP
from fairmd.lipids.auxiliary import CompactJSONEncoder, mollib
from fairmd.lipids.core import System, initialize_databank
from fairmd.lipids.experiment import ExperimentCollection


[docs] class QualSimulation(System): def __init__(self, s: System): super().__init__(s) self.op_data = get_OP(s) try: self.ff_data = get_FF(s) except FileNotFoundError: msg = f"System #{s['ID']} doesn't have computed FF data." warnings.warn(msg, stacklevel=2) self.ff_data = None
[docs] @staticmethod def load_all_paired() -> list["QualSimulation"]: """Load simulations with experimental pairings.""" systems = initialize_databank() simulations = [] for system in systems: experiments = system.get("EXPERIMENT", {}) if any(experiments.values()): # if experiments is not empty s = QualSimulation(system) simulations.append(s) print(f"Loaded {len(simulations)} that has some experimental pairings.") return simulations
[docs] class QualityEvaluator(ABC): """Interface for quality evaluation of simulation.""" def __init__(self, simulation: QualSimulation, experiments: ExperimentCollection): self._sim = simulation self._exps = experiments
[docs] @abstractmethod def evaluate_one(self) -> bool: """Evaluate quality for one simulation."""
[docs] @abstractmethod def save_results(self) -> None: """Save quality evaluation results."""
[docs] @staticmethod def prob_2_within_trustinterval( xv: npt.ArrayLike, xerr: npt.ArrayLike, yv: npt.ArrayLike, yerr: npt.ArrayLike, ) -> npt.ArrayLike: """ Compute the probability of two vals being a part of one distribution. Used quality value from experimental and simulation OP data. Probability is computed using Eq. (3) [10.1038/s41467-024-45189-z]. NOTE: Computing the probability taking into account small values using scipy.special.log1p is not required if sd is above 1e-5, which is currently the case for all OP data. It is also not required if we are not going to bring values to log-scale. Args: xv: Experimental OP value (all float/arrays) xerr: Experimental error yv: Simulated OP value yerr: Standard deviation from simulation Returns ------- float/ndarray: probability value(s) or nans """ # normal distribution N(s, OP_sim, op_sim_sd) a = xv - xerr b = xv + xerr a_rel = (yv - a) / yerr b_rel = (yv - b) / yerr p_b = scipy.stats.t.sf(b_rel, df=1, loc=0, scale=1) p_a = scipy.stats.t.sf(a_rel, df=1, loc=0, scale=1) return p_b - p_a
@staticmethod def _round_quality_values(obj: dict | list, ndigits: int = 4) -> dict | list: """Round all floating-point values in a nested dict/list structure.""" stack = [obj] while stack: current = stack.pop() if isinstance(current, dict): for k, v in current.items(): if isinstance(v, float): current[k] = round(v, ndigits) elif isinstance(v, (dict, list)): stack.append(v) elif isinstance(current, list): for i, v in enumerate(current): if isinstance(v, float): current[i] = round(v, ndigits) elif isinstance(v, (dict, list)): stack.append(v) return obj
[docs] class OPQualityEvaluator(QualityEvaluator): """Evaluate quality of order parameter data."""
[docs] def evaluate_one(self) -> bool: self.lipid_fragment_qualities = {} self.lipid_atomic_qualities = {} for lipidname, lipid in self._sim.lipids.items(): md_lipid_ops = self._sim.op_data[lipidname] if md_lipid_ops is None: continue # TODO: bb is merged into headgroup. But sn-s do not.. Cryptic rule. # TODO: What to do with other types of lipids? fragments = mollib.get_fragments(lipid.mapping_dict) fragment_qual_perexp = {} lipid_quality_perexp = {} for expid in self._sim["EXPERIMENT"]["ORDERPARAMETER"].get(lipidname, []): print(f"OP quality of simulation data in {self._sim['path']}") print( f".. evaluating {lipidname} lipid using experimental data from {expid}", ) exp_lipid_ops = self._exps.loc(expid).data[lipidname] # save qualities of simulation-vs-experiment into a dictionary lipid_quality_perexp[expid] = self.atomic_quality(exp_lipid_ops, md_lipid_ops) # calculate quality for molecule fragments headgroup, sn-1, sn-2 _frq = self.atomic2fragment_quality(lipid_quality_perexp[expid], fragments) _frw = self.get_fragments_coverage(fragments, lipid_quality_perexp[expid]) # dot product [ qualities * weights ] fragment_qual_perexp[expid] = {k: _frq[k] * _frw[k] for k in fragments} if not fragment_qual_perexp: continue fragment_quality_merged = self.fragment_quality_unite_multexp(fragment_qual_perexp) # export per-lipid qualities self.lipid_fragment_qualities[lipidname] = fragment_quality_merged self.lipid_atomic_qualities[lipidname] = lipid_quality_perexp if not self.lipid_fragment_qualities: return False self.system_qual_output = self.system_quality_gather_lipids() return True
[docs] def save_results(self) -> None: wdir = os.path.join(FMDL_SIMU_PATH, self._sim["path"]) # write into the XXXOrderParameters_quality.json quality data file for each lipid for lipidname, atomic_quality_perexp in self.lipid_atomic_qualities.items(): fname = os.path.join(wdir, lipidname + "_OrderParameters_quality.json") self._round_quality_values(atomic_quality_perexp) with open(fname, "w") as f: json.dump(atomic_quality_perexp, f, cls=CompactJSONEncoder) # Write fragment qualities for each lipid for lipidname, fragment_quality_merged in self.lipid_fragment_qualities.items(): if any(v > 0 for v in fragment_quality_merged.values()): fname = os.path.join(wdir, lipidname + "_FragmentQuality.json") self._round_quality_values(fragment_quality_merged) with open(fname, "w") as f: json.dump(fragment_quality_merged, f) # Write system quality if any(v > 0 for v in self.system_qual_output.values()): fname = os.path.join(wdir, "SYSTEM_quality.json") self._round_quality_values(self.system_qual_output) with open(fname, "w") as f: json.dump(self.system_qual_output, f) print("Order parameter quality written " + self._sim["path"])
[docs] @classmethod def atomic_quality(cls, exp_op_data: dict, sim_op_data: dict) -> dict: """ Calculate quality for a molecule (times their weights in exp data). :param exp_op_data: dictionary of type {op_uname: [op_value]}. :param sim_op_data: dictionary of type {op_uname: [op_value, op_sigma, op_sd]}. :return: dictionary of type {"nC nH": quality value}. """ exp_error = 0.02 # TODO: hardcoded error value, should be taken from experiment data when available # union of keys in exp_op_data and sim_op_data all_keys = sorted(set(exp_op_data.keys()) | set(sim_op_data.keys())) res_dict = {} for key in all_keys: if key not in exp_op_data or key not in sim_op_data: res_dict[key] = np.nan continue q = cls.prob_2_within_trustinterval( xv=exp_op_data[key][0], xerr=exp_error, yv=sim_op_data[key][0], yerr=sim_op_data[key][2], ) res_dict[key] = q return res_dict
[docs] @classmethod def get_fragments_coverage(cls, fragments: dict, q_data: dict) -> dict: """ Calculate the coverage in data of each fragment. The coverage is implemented as the percent of non-nans. :param fragments: Dictionary of type {fragment: lists of unames}. :param q_data: Dictionary of type {op_uname: quality value}. :return: Dictionary of type {fragment: percentage of evaluated OPs} """ frag_percentage = dict.fromkeys(fragments, 0) for frg_name, frg_atoms in fragments.items(): frg_atoms_set = set(frg_atoms) nan_mask = [np.isnan(v) for k, v in q_data.items() if k.split(" ")[0] in frg_atoms_set] frag_percentage[frg_name] = 1 - sum(nan_mask) / len(nan_mask) if len(nan_mask) > 0 else 0 return frag_percentage
[docs] @classmethod def atomic2fragment_quality(cls, atomic_qual_dict: dict, fragments: dict) -> dict: fqdict = dict.fromkeys(fragments.keys()) for frg_name, frg_atoms in fragments.items(): q_list = [v for k, v in atomic_qual_dict.items() if k.split(" ")[0] in frg_atoms] fqdict[frg_name] = np.nanmean(q_list) if len(q_list) > 0 else np.nan return fqdict
[docs] @classmethod def fragment_quality_unite_multexp(cls, fragment_qual_dict: dict) -> dict: """ Condition fragment qualities. The second-layer function. :param fragment_qual_perexp: dictionary of type {expid: {fragment: quality value}}. :return: dictionary of type {fragment: average quality value}. """ sums_dict = {} for exp_frag_qual in fragment_qual_dict.values(): for frag_name, frag_qual in exp_frag_qual.items(): sums_dict.setdefault(frag_name, []).append(frag_qual) avg_total_quality = {frag_name: np.nanmean(frag_vals) for frag_name, frag_vals in sums_dict.items()} avg_total_quality["total"] = np.nanmean(list(avg_total_quality.values())) return avg_total_quality
[docs] def system_quality_gather_lipids(self) -> dict: """ Gather fragment qualities for each lipid in a system and compute system-wide quality. :return: dictionary of type {macrofragment: quality} where macrofragmetn is headgroup|tails|total. """ system_fragment_qualities = self.lipid_fragment_qualities molar_composition = self._sim.membrane_composition(basis="molar") system_dict = {} lipid_dict = {} w_nan = [] for lname, lqual in system_fragment_qualities.items(): lipid_dict = dict.fromkeys(lqual.keys(), 0) w = molar_composition[lname] for key, value in lqual.items(): if not np.isnan(value): lipid_dict[key] += w * value else: # save 1 - w of a lipid into a list if the fragment quality is nan w_nan.append(1 - w) system_dict[lname] = lipid_dict system_quality = {} headgroup = 0 tails = 0 total = 0 for ldict in system_dict.values(): for key, value in ldict.items(): if key == "total": total += value elif key == "headgroup": headgroup += value elif key in {"sn-1", "sn-2"}: tails += value / 2 else: tails += value # everything non head is tail?? penalty = np.prod(w_nan) if w_nan else 1 system_quality["headgroup"] = headgroup * penalty system_quality["tails"] = tails * penalty system_quality["total"] = total * penalty return system_quality
[docs] class FFQualityEvaluator(QualityEvaluator): """Evaluate quality of form factor data."""
[docs] def evaluate_one(self) -> bool: results_ff = {} if "FORMFACTOR" not in self._sim.get("EXPERIMENT", {}) or not self._sim["EXPERIMENT"]["FORMFACTOR"]: return False if self._sim.ff_data is None: return False for expid in self._sim["EXPERIMENT"]["FORMFACTOR"]: exp_ff_data = np.array(self._exps.loc(expid).data) results_ff[expid] = [ self.calc_ff_quality(self._sim.ff_data, exp_ff_data), ff.calc_ff_scaling_distance(exp_ff_data, self._sim.ff_data)[0], ] # TODO: handle multiple FF experiments better # currently, just pick the best one best_ep = None for expid, ffq_scf in results_ff.items(): if ffq_scf is None: return False if best_ep is None or ffq_scf[0] < results_ff[best_ep][0]: best_ep = expid print(f"Form factor quality used for experiment data from {best_ep}:") self.ff_quality_output = results_ff[best_ep] return True
[docs] def save_results(self) -> None: print("Distance =", self.ff_quality_output[0], "; scaling factor =", self.ff_quality_output[1]) wdir = os.path.join(FMDL_SIMU_PATH, self._sim["path"]) print("Form factor quality saved for ", wdir) fname = os.path.join(wdir, "FormFactorQuality.json") ff_quality = list(self.ff_quality_output) self._round_quality_values(ff_quality) with open(fname, "w") as f: json.dump(ff_quality, f)
[docs] @staticmethod def calc_ff_quality(ffd_sim: np.ndarray, ffd_exp: np.ndarray) -> float: """ Calculate form factor quality. Quality calculation is performed as defined by Kučerka et al. 2010, doi:10.1007/s00232-010-9254-5 :param ffd_sim: Simulation FF data (float 2D list) :param ffd_exp: Experiment FF data (float 2D list) :return: Quality value. """ sim_min = ff.get_mins_from_ffdata(ffd_sim) exp_min = ff.get_mins_from_ffdata(ffd_exp) return np.abs(sim_min[0] - exp_min[0]) * 100