Source code for BuildDatabank.searchDATABANK

#!/usr/bin/env python3
# coding: utf-8

import os
import sys
import yaml

from tqdm import tqdm
from typing import List, IO

from DatabankLib import NMLDB_SIMU_PATH, NMLDB_EXP_PATH
from DatabankLib.core import System, initialize_databank
from DatabankLib.databankLibrary import lipids_set

import logging
logger = logging.getLogger("__name__")

# TODO: move ions list into Data
ions_list = ['POT', 'SOD', 'CLA', 'CAL']  # should contain names of all ions

LIP_CONC_REL_THRESHOLD = 0.15   # relative acceptable error for determination
# of the hydration in ssNMR # noqa


[docs] class SearchSystem: system: dict idx_path: str def __init__(self, readme): self.system: System = readme self.idx_path = readme['path']
[docs] def get_lipids(self, molecules=lipids_set): lipids = [k for k in self.system['COMPOSITION'] if k in molecules] return lipids
[docs] def get_ions(self, ions): sim_ions = [k for k in self.system['COMPOSITION'] if k in ions] return sim_ions
# fraction of each lipid with respect to total amount of lipids (only for lipids!)
[docs] def molar_fraction(self, molecule, molecules=lipids_set) -> float: cmps = self.system['COMPOSITION'] number = sum(cmps[molecule]['COUNT']) all_counts = [i['COUNT'] for k, i in cmps.items() if k in molecules] return number / sum(map(sum, all_counts))
# concentration of other molecules than lipids # change name to ionConcentration()
[docs] def ion_conc(self, molecule, exp_counter_ions): lipids1 = self.get_lipids() c_water = 55.5 n_water = self.system['COMPOSITION']['SOL']['COUNT'] try: n_molecule = self.system['COMPOSITION'][molecule]['COUNT'] # number of ions except KeyError: n_molecule = 0 lipids2 = [] if exp_counter_ions and n_molecule != 0: for lipid in lipids1: if (molecule in exp_counter_ions.keys() and lipid == exp_counter_ions[molecule]): n_lipid = self.system['COMPOSITION'][lipid]['COUNT'] lipids2.append(sum(n_lipid)) n_molecule = n_molecule - sum(lipids2) c_molecule = (n_molecule * c_water) / n_water return c_molecule
[docs] def total_lipid_conc(self): c_water = 55.5 n_water = self.system['COMPOSITION']['SOL']['COUNT'] n_lipids = 0 for lipid in self.get_lipids(): try: n_lipids += sum(self.system['COMPOSITION'][lipid]['COUNT']) except KeyError as e: print(self.system) raise e try: if (n_water / n_lipids) > 25: tot_lipid_c = 'full hydration' else: tot_lipid_c = (n_lipids * c_water) / n_water except ZeroDivisionError: logger.warning("Division by zero when determining lipid concentration!") print(self.system) return tot_lipid_c
##################
[docs] class Experiment: def __init__(self, readme: dict, molname: str, data_path: str, exptype: str): self.readme = readme self.molname = molname # <- the dictionary about existence of data files self.dataPath = data_path # .... for particular lipids self.exptype = exptype
[docs] def get_lipids(self, molecules=lipids_set) -> List[str]: lipids = [k for k in self.readme['MOLAR_FRACTIONS'] if k in molecules] return lipids
[docs] def get_ions(self, ions) -> List[str]: exp_ions: List[str] = [] for key in ions: try: if self.readme['ION_CONCENTRATIONS'][key] != 0: exp_ions.append(key) except KeyError: continue try: if key in self.readme['COUNTER_IONS']: exp_ions.append(key) except (TypeError, KeyError): continue return exp_ions
[docs] def load_simulations() -> List[SearchSystem]: """ Generates the list of Simulation objects. Go through all README.yaml files. """ systems = initialize_databank() simulations: List[SearchSystem] = [] for system in systems: # conditions of exclusions try: if system['WARNINGS']['NOWATER']: continue except (KeyError, TypeError): pass simulations.append(SearchSystem(system)) return simulations
[docs] def load_experiments(exp_type: str) -> List[Experiment]: """ Loops over the experiment entries in the experiment databank and read experiment readme and order parameter files into objects. """ if exp_type == 'OrderParameters': data_file = '_Order_Parameters.json' elif exp_type == 'FormFactors': data_file = '_FormFactor.json' else: raise NotImplementedError( "Only OrderParameters and FormFactors types are implemented.") print("Build experiments [%s] index..." % exp_type, end='') rm_idx = [] path = os.path.join(NMLDB_EXP_PATH, exp_type) for subdir, dirs, files in os.walk(path): for fn in files: if fn == 'README.yaml': rm_idx.append(subdir) print('%d READMEs loaded.' % len(rm_idx)) print("Loading data for each experiment.") experiments: List[Experiment] = [] for subdir in tqdm(rm_idx, desc='Experiment'): try: exp_readme_fp = os.path.join(subdir, 'README.yaml') with open(exp_readme_fp) as yaml_file_exp: exp_readme = yaml.load(yaml_file_exp, Loader=yaml.FullLoader) except (FileNotFoundError, PermissionError): logger.warning(f"Problems while accessing README.yaml in: {subdir}") continue for fname in os.listdir(subdir): if fname.endswith(data_file): molecule_name = "" if exp_type == "OrderParameters": molecule_name = fname.replace(data_file, '') elif exp_type == "FormFactors": molecule_name = 'system' experiments.append(Experiment( exp_readme, molecule_name, subdir, exp_type)) return experiments
[docs] def find_pairs(experiments: List[Experiment], simulations: List[SearchSystem]): pairs = [] for simulation in tqdm(simulations, desc='Simulation'): sim_lipids = simulation.get_lipids() sim_total_lipid_concentration = simulation.total_lipid_conc() sim_ions = simulation.get_ions(ions_list) t_sim = simulation.system['TEMPERATURE'] # calculate molar fractions from simulation sim_molar_fractions = {} for lipid in sim_lipids: sim_molar_fractions[lipid] = simulation.molar_fraction(lipid) for experiment in experiments: # check lipid composition matches the simulation exp_lipids = experiment.get_lipids() exp_total_lipid_concentration = \ experiment.readme['TOTAL_LIPID_CONCENTRATION'] exp_ions = experiment.get_ions(ions_list) exp_counter_ions = experiment.readme['COUNTER_IONS'] # calculate simulation ion concentrations sim_concentrations = {} for molecule in ions_list: sim_concentrations[molecule] = simulation.ion_conc( molecule, exp_counter_ions) # continue if lipid compositions are the same if set(sim_lipids) == set(exp_lipids): # compare molar fractions mf_ok = 0 for key in sim_lipids: if ((experiment.readme['MOLAR_FRACTIONS'][key] >= sim_molar_fractions[key] - 0.03) and (experiment.readme['MOLAR_FRACTIONS'][key] <= sim_molar_fractions[key] + 0.03)): mf_ok += 1 # compare ion concentrations c_ok = 0 if set(sim_ions) == set(exp_ions): for key in sim_ions: if ((experiment.readme['ION_CONCENTRATIONS'][key] >= sim_concentrations[key] - 0.05) and (experiment.readme['ION_CONCENTRATIONS'][key] <= sim_concentrations[key] + 0.05)): c_ok += 1 switch = 0 if (isinstance(exp_total_lipid_concentration, (int, float)) and isinstance(sim_total_lipid_concentration, (int, float))): if ((exp_total_lipid_concentration / sim_total_lipid_concentration > 1 - LIP_CONC_REL_THRESHOLD) and (exp_total_lipid_concentration / sim_total_lipid_concentration < 1 + LIP_CONC_REL_THRESHOLD)): switch = 1 elif ((type(exp_total_lipid_concentration) is str) and (type(sim_total_lipid_concentration) is str)): if exp_total_lipid_concentration == sim_total_lipid_concentration: switch = 1 if switch: # check temperature +/- 2 degrees t_exp = experiment.readme['TEMPERATURE'] if ((mf_ok == len(sim_lipids)) and (c_ok == len(sim_ions)) and (t_exp > float(t_sim) - 2.5) and (t_exp < float(t_sim) + 2.5)): # !we found the match! pairs.append([simulation, experiment]) # Add path to experiment into simulation README.yaml # many experiment entries can match to same simulation exp_doi = experiment.readme['DOI'] exp_path = os.path.relpath( experiment.dataPath, start=os.path.join(NMLDB_EXP_PATH, experiment.exptype)) if experiment.exptype == "OrderParameters": lipid = experiment.molname simulation.system['EXPERIMENT']['ORDERPARAMETER'][lipid][exp_doi] = exp_path # noqa elif experiment.exptype == "FormFactors": simulation.system['EXPERIMENT']['FORMFACTOR'] = exp_path else: continue # sorting experiment lists to keep experimental order strict cur_exp = simulation.system['EXPERIMENT'] for _lipid in cur_exp['ORDERPARAMETER']: unsort_dict = cur_exp['ORDERPARAMETER'][_lipid].copy() if not len(unsort_dict): continue sort_dict = dict(sorted(unsort_dict.items())) cur_exp['ORDERPARAMETER'][_lipid] = sort_dict.copy() outfile_dict = os.path.join( NMLDB_SIMU_PATH, simulation.idx_path, 'README.yaml') with open(outfile_dict, 'w') as f: if "path" in simulation.system.keys(): del (simulation.system['path']) yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True) return pairs
[docs] def log_pairs(pairs, fd: IO[str]) -> None: """ Write found correspondences into log file. pairs: [(Simulation, Experiment), ...] fd: file descriptor for writting into """ for p in pairs: sim: SearchSystem = p[0] exp: Experiment = p[1] sysn = sim.system['SYSTEM'] simp = sim.idx_path expp = exp.dataPath expd = exp.readme['DOI'] fd.write(f""" -------------------------------------------------------------------------------- Simulation: - {sysn} - {simp} Experiment: - {expd} - {expp}""") # end for fd.write(""" -------------------------------------------------------------------------------- \n""")
[docs] def main(): """ Main program function. Not for exporting. """ simulations = load_simulations() # clear all EXPERIMENT sections in all simulations # TODO: check if EXPERIMENT section changed and trigger the action! for simulation in simulations: simulation.system['EXPERIMENT'] = {} simulation.system['EXPERIMENT']['ORDERPARAMETER'] = {} simulation.system['EXPERIMENT']['FORMFACTOR'] = {} for lipid in simulation.get_lipids(): simulation.system['EXPERIMENT']['ORDERPARAMETER'][lipid] = {} readme_path = os.path.join( NMLDB_SIMU_PATH, simulation.idx_path, 'README.yaml') with open(readme_path, 'w') as f: yaml.dump(simulation.system.readme, f, sort_keys=False, allow_unicode=True) experiments_op = load_experiments('OrderParameters') experiments_ff = load_experiments('FormFactors') # Pair each simulation with an experiment with the closest matching temperature # and composition with open('search-databank-pairs.log', 'w') as logf: print("Scanning simulation-experiment pairs among order parameter experiments.") pairs_op = find_pairs(experiments_op, simulations) logf.write("=== OP PAIRS ===\n") log_pairs(pairs_op, logf) print("Scanning simulation-experiment pairs among form factor experiments.") pairs_ff = find_pairs(experiments_ff, simulations) logf.write("=== FF PAIRS ===\n") log_pairs(pairs_ff, logf) ''' for pair in pairsFF: print('#################') print(pair[0].readme) print(pair[0].indexingPath) print("#") print(pair[1].readme) ''' print("Found order parameter data for " + str(len(pairs_op)) + " pairs") print("Found form factor data for " + str(len(pairs_ff)) + " pairs")
if __name__ == "__main__": main()