#!/usr/bin/env python3
# coding: utf-8
import os
import sys
import yaml
from tqdm import tqdm
from typing import List, IO
from DatabankLib import NMLDB_SIMU_PATH, NMLDB_EXP_PATH
from DatabankLib.core import System, initialize_databank
from DatabankLib.databankLibrary import lipids_set
import logging
logger = logging.getLogger("__name__")
# TODO: move ions list into Data
ions_list = ['POT', 'SOD', 'CLA', 'CAL'] # should contain names of all ions
LIP_CONC_REL_THRESHOLD = 0.15 # relative acceptable error for determination
# of the hydration in ssNMR # noqa
[docs]
class SearchSystem:
system: dict
idx_path: str
def __init__(self, readme):
self.system: System = readme
self.idx_path = readme['path']
[docs]
def get_lipids(self, molecules=lipids_set):
lipids = [k for k in self.system['COMPOSITION'] if k in molecules]
return lipids
[docs]
def get_ions(self, ions):
sim_ions = [k for k in self.system['COMPOSITION'] if k in ions]
return sim_ions
# fraction of each lipid with respect to total amount of lipids (only for lipids!)
[docs]
def molar_fraction(self, molecule, molecules=lipids_set) -> float:
cmps = self.system['COMPOSITION']
number = sum(cmps[molecule]['COUNT'])
all_counts = [i['COUNT'] for k, i in cmps.items() if k in molecules]
return number / sum(map(sum, all_counts))
# concentration of other molecules than lipids
# change name to ionConcentration()
[docs]
def ion_conc(self, molecule, exp_counter_ions):
lipids1 = self.get_lipids()
c_water = 55.5
n_water = self.system['COMPOSITION']['SOL']['COUNT']
try:
n_molecule = self.system['COMPOSITION'][molecule]['COUNT'] # number of ions
except KeyError:
n_molecule = 0
lipids2 = []
if exp_counter_ions and n_molecule != 0:
for lipid in lipids1:
if (molecule in exp_counter_ions.keys() and
lipid == exp_counter_ions[molecule]):
n_lipid = self.system['COMPOSITION'][lipid]['COUNT']
lipids2.append(sum(n_lipid))
n_molecule = n_molecule - sum(lipids2)
c_molecule = (n_molecule * c_water) / n_water
return c_molecule
[docs]
def total_lipid_conc(self):
c_water = 55.5
n_water = self.system['COMPOSITION']['SOL']['COUNT']
n_lipids = 0
for lipid in self.get_lipids():
try:
n_lipids += sum(self.system['COMPOSITION'][lipid]['COUNT'])
except KeyError as e:
print(self.system)
raise e
try:
if (n_water / n_lipids) > 25:
tot_lipid_c = 'full hydration'
else:
tot_lipid_c = (n_lipids * c_water) / n_water
except ZeroDivisionError:
logger.warning("Division by zero when determining lipid concentration!")
print(self.system)
return tot_lipid_c
##################
[docs]
class Experiment:
def __init__(self, readme: dict, molname: str, data_path: str, exptype: str):
self.readme = readme
self.molname = molname # <- the dictionary about existence of data files
self.dataPath = data_path # .... for particular lipids
self.exptype = exptype
[docs]
def get_lipids(self, molecules=lipids_set) -> List[str]:
lipids = [k for k in self.readme['MOLAR_FRACTIONS'] if k in molecules]
return lipids
[docs]
def get_ions(self, ions) -> List[str]:
exp_ions: List[str] = []
for key in ions:
try:
if self.readme['ION_CONCENTRATIONS'][key] != 0:
exp_ions.append(key)
except KeyError:
continue
try:
if key in self.readme['COUNTER_IONS']:
exp_ions.append(key)
except (TypeError, KeyError):
continue
return exp_ions
[docs]
def load_simulations() -> List[SearchSystem]:
"""
Generates the list of Simulation objects. Go through all README.yaml files.
"""
systems = initialize_databank()
simulations: List[SearchSystem] = []
for system in systems:
# conditions of exclusions
try:
if system['WARNINGS']['NOWATER']:
continue
except (KeyError, TypeError):
pass
simulations.append(SearchSystem(system))
return simulations
[docs]
def load_experiments(exp_type: str) -> List[Experiment]:
"""
Loops over the experiment entries in the experiment databank and read experiment
readme and order parameter files into objects.
"""
if exp_type == 'OrderParameters':
data_file = '_Order_Parameters.json'
elif exp_type == 'FormFactors':
data_file = '_FormFactor.json'
else:
raise NotImplementedError(
"Only OrderParameters and FormFactors types are implemented.")
print("Build experiments [%s] index..." % exp_type, end='')
rm_idx = []
path = os.path.join(NMLDB_EXP_PATH, exp_type)
for subdir, dirs, files in os.walk(path):
for fn in files:
if fn == 'README.yaml':
rm_idx.append(subdir)
print('%d READMEs loaded.' % len(rm_idx))
print("Loading data for each experiment.")
experiments: List[Experiment] = []
for subdir in tqdm(rm_idx, desc='Experiment'):
try:
exp_readme_fp = os.path.join(subdir, 'README.yaml')
with open(exp_readme_fp) as yaml_file_exp:
exp_readme = yaml.load(yaml_file_exp, Loader=yaml.FullLoader)
except (FileNotFoundError, PermissionError):
logger.warning(f"Problems while accessing README.yaml in: {subdir}")
continue
for fname in os.listdir(subdir):
if fname.endswith(data_file):
molecule_name = ""
if exp_type == "OrderParameters":
molecule_name = fname.replace(data_file, '')
elif exp_type == "FormFactors":
molecule_name = 'system'
experiments.append(Experiment(
exp_readme, molecule_name, subdir, exp_type))
return experiments
[docs]
def find_pairs(experiments: List[Experiment], simulations: List[SearchSystem]):
pairs = []
for simulation in tqdm(simulations, desc='Simulation'):
sim_lipids = simulation.get_lipids()
sim_total_lipid_concentration = simulation.total_lipid_conc()
sim_ions = simulation.get_ions(ions_list)
t_sim = simulation.system['TEMPERATURE']
# calculate molar fractions from simulation
sim_molar_fractions = {}
for lipid in sim_lipids:
sim_molar_fractions[lipid] = simulation.molar_fraction(lipid)
for experiment in experiments:
# check lipid composition matches the simulation
exp_lipids = experiment.get_lipids()
exp_total_lipid_concentration = \
experiment.readme['TOTAL_LIPID_CONCENTRATION']
exp_ions = experiment.get_ions(ions_list)
exp_counter_ions = experiment.readme['COUNTER_IONS']
# calculate simulation ion concentrations
sim_concentrations = {}
for molecule in ions_list:
sim_concentrations[molecule] = simulation.ion_conc(
molecule, exp_counter_ions)
# continue if lipid compositions are the same
if set(sim_lipids) == set(exp_lipids):
# compare molar fractions
mf_ok = 0
for key in sim_lipids:
if ((experiment.readme['MOLAR_FRACTIONS'][key] >=
sim_molar_fractions[key] - 0.03) and
(experiment.readme['MOLAR_FRACTIONS'][key] <=
sim_molar_fractions[key] + 0.03)):
mf_ok += 1
# compare ion concentrations
c_ok = 0
if set(sim_ions) == set(exp_ions):
for key in sim_ions:
if ((experiment.readme['ION_CONCENTRATIONS'][key] >=
sim_concentrations[key] - 0.05) and
(experiment.readme['ION_CONCENTRATIONS'][key] <=
sim_concentrations[key] + 0.05)):
c_ok += 1
switch = 0
if (isinstance(exp_total_lipid_concentration, (int, float)) and
isinstance(sim_total_lipid_concentration, (int, float))):
if ((exp_total_lipid_concentration / sim_total_lipid_concentration >
1 - LIP_CONC_REL_THRESHOLD) and
(exp_total_lipid_concentration / sim_total_lipid_concentration <
1 + LIP_CONC_REL_THRESHOLD)):
switch = 1
elif ((type(exp_total_lipid_concentration) is str) and
(type(sim_total_lipid_concentration) is str)):
if exp_total_lipid_concentration == sim_total_lipid_concentration:
switch = 1
if switch:
# check temperature +/- 2 degrees
t_exp = experiment.readme['TEMPERATURE']
if ((mf_ok == len(sim_lipids)) and
(c_ok == len(sim_ions)) and
(t_exp > float(t_sim) - 2.5) and
(t_exp < float(t_sim) + 2.5)):
# !we found the match!
pairs.append([simulation, experiment])
# Add path to experiment into simulation README.yaml
# many experiment entries can match to same simulation
exp_doi = experiment.readme['DOI']
exp_path = os.path.relpath(
experiment.dataPath,
start=os.path.join(NMLDB_EXP_PATH, experiment.exptype))
if experiment.exptype == "OrderParameters":
lipid = experiment.molname
simulation.system['EXPERIMENT']['ORDERPARAMETER'][lipid][exp_doi] = exp_path # noqa
elif experiment.exptype == "FormFactors":
simulation.system['EXPERIMENT']['FORMFACTOR'] = exp_path
else:
continue
# sorting experiment lists to keep experimental order strict
cur_exp = simulation.system['EXPERIMENT']
for _lipid in cur_exp['ORDERPARAMETER']:
unsort_dict = cur_exp['ORDERPARAMETER'][_lipid].copy()
if not len(unsort_dict):
continue
sort_dict = dict(sorted(unsort_dict.items()))
cur_exp['ORDERPARAMETER'][_lipid] = sort_dict.copy()
outfile_dict = os.path.join(
NMLDB_SIMU_PATH, simulation.idx_path, 'README.yaml')
with open(outfile_dict, 'w') as f:
if "path" in simulation.system.keys():
del (simulation.system['path'])
yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True)
return pairs
[docs]
def log_pairs(pairs, fd: IO[str]) -> None:
"""
Write found correspondences into log file.
pairs: [(Simulation, Experiment), ...]
fd: file descriptor for writting into
"""
for p in pairs:
sim: SearchSystem = p[0]
exp: Experiment = p[1]
sysn = sim.system['SYSTEM']
simp = sim.idx_path
expp = exp.dataPath
expd = exp.readme['DOI']
fd.write(f"""
--------------------------------------------------------------------------------
Simulation:
- {sysn}
- {simp}
Experiment:
- {expd}
- {expp}""")
# end for
fd.write("""
--------------------------------------------------------------------------------
\n""")
[docs]
def main():
"""
Main program function. Not for exporting.
"""
simulations = load_simulations()
# clear all EXPERIMENT sections in all simulations
# TODO: check if EXPERIMENT section changed and trigger the action!
for simulation in simulations:
simulation.system['EXPERIMENT'] = {}
simulation.system['EXPERIMENT']['ORDERPARAMETER'] = {}
simulation.system['EXPERIMENT']['FORMFACTOR'] = {}
for lipid in simulation.get_lipids():
simulation.system['EXPERIMENT']['ORDERPARAMETER'][lipid] = {}
readme_path = os.path.join(
NMLDB_SIMU_PATH, simulation.idx_path, 'README.yaml')
with open(readme_path, 'w') as f:
yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True)
experiments_op = load_experiments('OrderParameters')
experiments_ff = load_experiments('FormFactors')
# Pair each simulation with an experiment with the closest matching temperature
# and composition
with open('search-databank-pairs.log', 'w') as logf:
print("Scanning simulation-experiment pairs among order parameter experiments.")
pairs_op = find_pairs(experiments_op, simulations)
logf.write("=== OP PAIRS ===\n")
log_pairs(pairs_op, logf)
print("Scanning simulation-experiment pairs among form factor experiments.")
pairs_ff = find_pairs(experiments_ff, simulations)
logf.write("=== FF PAIRS ===\n")
log_pairs(pairs_ff, logf)
'''
for pair in pairsFF:
print('#################')
print(pair[0].readme)
print(pair[0].indexingPath)
print("#")
print(pair[1].readme)
'''
print("Found order parameter data for " + str(len(pairs_op)) + " pairs")
print("Found form factor data for " + str(len(pairs_ff)) + " pairs")
if __name__ == "__main__":
main()