#!/usr/bin/env python3
import logging
import os
from typing import IO, List
import yaml
from DatabankLib import NMLDB_EXP_PATH, NMLDB_SIMU_PATH
from DatabankLib.core import System, initialize_databank
from DatabankLib.databankLibrary import lipids_set
from tqdm import tqdm
logger = logging.getLogger("__name__")
# TODO: move ions list into Data
ions_list = ["POT", "SOD", "CLA", "CAL"] # should contain names of all ions
LIP_CONC_REL_THRESHOLD = 0.15 # relative acceptable error for determination
# of the hydration in ssNMR
[docs]
class SearchSystem:
system: dict
idx_path: str
def __init__(self, readme):
self.system: System = readme
self.idx_path = readme["path"]
[docs]
def get_lipids(self, molecules=lipids_set):
lipids = [k for k in self.system["COMPOSITION"] if k in molecules]
return lipids
[docs]
def get_ions(self, ions):
sim_ions = [k for k in self.system["COMPOSITION"] if k in ions]
return sim_ions
# fraction of each lipid with respect to total amount of lipids (only for lipids!)
[docs]
def molar_fraction(self, molecule, molecules=lipids_set) -> float:
cmps = self.system["COMPOSITION"]
number = sum(cmps[molecule]["COUNT"])
all_counts = [i["COUNT"] for k, i in cmps.items() if k in molecules]
return number / sum(map(sum, all_counts))
# concentration of other molecules than lipids
# change name to ionConcentration()
[docs]
def ion_conc(self, molecule, exp_counter_ions):
lipids1 = self.get_lipids()
c_water = 55.5
n_water = self.system["COMPOSITION"]["SOL"]["COUNT"]
try:
n_molecule = self.system["COMPOSITION"][molecule]["COUNT"] # number of ions
except KeyError:
n_molecule = 0
lipids2 = []
if exp_counter_ions and n_molecule != 0:
for lipid in lipids1:
if molecule in exp_counter_ions.keys() and lipid == exp_counter_ions[molecule]:
n_lipid = self.system["COMPOSITION"][lipid]["COUNT"]
lipids2.append(sum(n_lipid))
n_molecule = n_molecule - sum(lipids2)
c_molecule = (n_molecule * c_water) / n_water
return c_molecule
[docs]
def total_lipid_conc(self):
c_water = 55.5
n_water = self.system["COMPOSITION"]["SOL"]["COUNT"]
n_lipids = 0
for lipid in self.get_lipids():
try:
n_lipids += sum(self.system["COMPOSITION"][lipid]["COUNT"])
except KeyError as e:
print(self.system)
raise e
try:
if (n_water / n_lipids) > 25:
tot_lipid_c = "full hydration"
else:
tot_lipid_c = (n_lipids * c_water) / n_water
except ZeroDivisionError:
logger.warning("Division by zero when determining lipid concentration!")
print(self.system)
return tot_lipid_c
##################
[docs]
class Experiment:
def __init__(self, readme: dict, molname: str, data_path: str, exptype: str):
self.readme = readme
self.molname = molname # <- the dictionary about existence of data files
self.dataPath = data_path # .... for particular lipids
self.exptype = exptype
[docs]
def get_lipids(self, molecules=lipids_set) -> List[str]:
lipids = [k for k in self.readme["MOLAR_FRACTIONS"] if k in molecules]
return lipids
[docs]
def get_ions(self, ions) -> List[str]:
exp_ions: List[str] = []
for key in ions:
try:
if self.readme["ION_CONCENTRATIONS"][key] != 0:
exp_ions.append(key)
except KeyError:
continue
try:
if key in self.readme["COUNTER_IONS"]:
exp_ions.append(key)
except (TypeError, KeyError):
continue
return exp_ions
[docs]
def load_simulations() -> List[SearchSystem]:
"""
Generates the list of Simulation objects. Go through all README.yaml files.
"""
systems = initialize_databank()
simulations: List[SearchSystem] = []
for system in systems:
# conditions of exclusions
try:
if system["WARNINGS"]["NOWATER"]:
continue
except (KeyError, TypeError):
pass
simulations.append(SearchSystem(system))
return simulations
[docs]
def load_experiments(exp_type: str) -> List[Experiment]:
"""
Loops over the experiment entries in the experiment databank and read experiment
readme and order parameter files into objects.
"""
if exp_type == "OrderParameters":
data_file = "_Order_Parameters.json"
elif exp_type == "FormFactors":
data_file = "_FormFactor.json"
else:
raise NotImplementedError("Only OrderParameters and FormFactors types are implemented.")
print("Build experiments [%s] index..." % exp_type, end="")
rm_idx = []
path = os.path.join(NMLDB_EXP_PATH, exp_type)
for subdir, dirs, files in os.walk(path):
for fn in files:
if fn == "README.yaml":
rm_idx.append(subdir)
print("%d READMEs loaded." % len(rm_idx))
print("Loading data for each experiment.")
experiments: List[Experiment] = []
for subdir in tqdm(rm_idx, desc="Experiment"):
try:
exp_readme_fp = os.path.join(subdir, "README.yaml")
with open(exp_readme_fp) as yaml_file_exp:
exp_readme = yaml.load(yaml_file_exp, Loader=yaml.FullLoader)
except (FileNotFoundError, PermissionError):
logger.warning(f"Problems while accessing README.yaml in: {subdir}")
continue
for fname in os.listdir(subdir):
if fname.endswith(data_file):
molecule_name = ""
if exp_type == "OrderParameters":
molecule_name = fname.replace(data_file, "")
elif exp_type == "FormFactors":
molecule_name = "system"
experiments.append(Experiment(exp_readme, molecule_name, subdir, exp_type))
return experiments
[docs]
def find_pairs(experiments: List[Experiment], simulations: List[SearchSystem]):
pairs = []
for simulation in tqdm(simulations, desc="Simulation"):
sim_lipids = simulation.get_lipids()
sim_total_lipid_concentration = simulation.total_lipid_conc()
sim_ions = simulation.get_ions(ions_list)
t_sim = simulation.system["TEMPERATURE"]
# calculate molar fractions from simulation
sim_molar_fractions = {}
for lipid in sim_lipids:
sim_molar_fractions[lipid] = simulation.molar_fraction(lipid)
for experiment in experiments:
# check lipid composition matches the simulation
exp_lipids = experiment.get_lipids()
exp_total_lipid_concentration = experiment.readme["TOTAL_LIPID_CONCENTRATION"]
exp_ions = experiment.get_ions(ions_list)
exp_counter_ions = experiment.readme["COUNTER_IONS"]
# calculate simulation ion concentrations
sim_concentrations = {}
for molecule in ions_list:
sim_concentrations[molecule] = simulation.ion_conc(molecule, exp_counter_ions)
# continue if lipid compositions are the same
if set(sim_lipids) == set(exp_lipids):
# compare molar fractions
mf_ok = 0
for key in sim_lipids:
if (experiment.readme["MOLAR_FRACTIONS"][key] >= sim_molar_fractions[key] - 0.03) and (
experiment.readme["MOLAR_FRACTIONS"][key] <= sim_molar_fractions[key] + 0.03
):
mf_ok += 1
# compare ion concentrations
c_ok = 0
if set(sim_ions) == set(exp_ions):
for key in sim_ions:
if (experiment.readme["ION_CONCENTRATIONS"][key] >= sim_concentrations[key] - 0.05) and (
experiment.readme["ION_CONCENTRATIONS"][key] <= sim_concentrations[key] + 0.05
):
c_ok += 1
switch = 0
if isinstance(exp_total_lipid_concentration, (int, float)) and isinstance(
sim_total_lipid_concentration,
(int, float),
):
if (
exp_total_lipid_concentration / sim_total_lipid_concentration > 1 - LIP_CONC_REL_THRESHOLD
) and (exp_total_lipid_concentration / sim_total_lipid_concentration < 1 + LIP_CONC_REL_THRESHOLD):
switch = 1
elif (type(exp_total_lipid_concentration) is str) and (type(sim_total_lipid_concentration) is str):
if exp_total_lipid_concentration == sim_total_lipid_concentration:
switch = 1
if switch:
# check temperature +/- 2 degrees
t_exp = experiment.readme["TEMPERATURE"]
if (
(mf_ok == len(sim_lipids))
and (c_ok == len(sim_ions))
and (t_exp > float(t_sim) - 2.5)
and (t_exp < float(t_sim) + 2.5)
):
# !we found the match!
pairs.append([simulation, experiment])
# Add path to experiment into simulation README.yaml
# many experiment entries can match to same simulation
exp_doi = experiment.readme["DOI"]
exp_path = os.path.relpath(
experiment.dataPath,
start=os.path.join(NMLDB_EXP_PATH, experiment.exptype),
)
if experiment.exptype == "OrderParameters":
lipid = experiment.molname
simulation.system["EXPERIMENT"]["ORDERPARAMETER"][lipid][exp_doi] = exp_path
elif experiment.exptype == "FormFactors":
simulation.system["EXPERIMENT"]["FORMFACTOR"] = exp_path
else:
continue
# sorting experiment lists to keep experimental order strict
cur_exp = simulation.system["EXPERIMENT"]
for _lipid in cur_exp["ORDERPARAMETER"]:
unsort_dict = cur_exp["ORDERPARAMETER"][_lipid].copy()
if not len(unsort_dict):
continue
sort_dict = dict(sorted(unsort_dict.items()))
cur_exp["ORDERPARAMETER"][_lipid] = sort_dict.copy()
outfile_dict = os.path.join(NMLDB_SIMU_PATH, simulation.idx_path, "README.yaml")
with open(outfile_dict, "w") as f:
if "path" in simulation.system.keys():
del simulation.system["path"]
yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True)
return pairs
[docs]
def log_pairs(pairs, fd: IO[str]) -> None:
"""
Write found correspondences into log file.
pairs: [(Simulation, Experiment), ...]
fd: file descriptor for writting into
"""
for p in pairs:
sim: SearchSystem = p[0]
exp: Experiment = p[1]
sysn = sim.system["SYSTEM"]
simp = sim.idx_path
expp = exp.dataPath
expd = exp.readme["DOI"]
fd.write(f"""
--------------------------------------------------------------------------------
Simulation:
- {sysn}
- {simp}
Experiment:
- {expd}
- {expp}""")
# end for
fd.write("""
--------------------------------------------------------------------------------
\n""")
[docs]
def main():
"""
Main program function. Not for exporting.
"""
simulations = load_simulations()
# clear all EXPERIMENT sections in all simulations
# TODO: check if EXPERIMENT section changed and trigger the action!
for simulation in simulations:
simulation.system["EXPERIMENT"] = {}
simulation.system["EXPERIMENT"]["ORDERPARAMETER"] = {}
simulation.system["EXPERIMENT"]["FORMFACTOR"] = {}
for lipid in simulation.get_lipids():
simulation.system["EXPERIMENT"]["ORDERPARAMETER"][lipid] = {}
readme_path = os.path.join(NMLDB_SIMU_PATH, simulation.idx_path, "README.yaml")
with open(readme_path, "w") as f:
yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True)
experiments_op = load_experiments("OrderParameters")
experiments_ff = load_experiments("FormFactors")
# Pair each simulation with an experiment with the closest matching temperature
# and composition
with open("search-databank-pairs.log", "w") as logf:
print("Scanning simulation-experiment pairs among order parameter experiments.")
pairs_op = find_pairs(experiments_op, simulations)
logf.write("=== OP PAIRS ===\n")
log_pairs(pairs_op, logf)
print("Scanning simulation-experiment pairs among form factor experiments.")
pairs_ff = find_pairs(experiments_ff, simulations)
logf.write("=== FF PAIRS ===\n")
log_pairs(pairs_ff, logf)
"""
for pair in pairsFF:
print('#################')
print(pair[0].readme)
print(pair[0].indexingPath)
print("#")
print(pair[1].readme)
"""
print("Found order parameter data for " + str(len(pairs_op)) + " pairs")
print("Found form factor data for " + str(len(pairs_ff)) + " pairs")
if __name__ == "__main__":
main()