"""
Library contains all API functions and many functions used in building and
analyzing the NMRlipids databank
"""
import copy
import hashlib
import urllib
import logging
from DatabankLib.settings.engines import get_struc_top_traj_fnames, software_dict
from tqdm import tqdm
import json
import yaml
import os
import sys
import numpy as np
import math
import MDAnalysis as mda
from DatabankLib import NMLDB_SIMU_PATH, NMLDB_ROOT_PATH
from DatabankLib.settings.molecules import (
lipids_dict, molecules_dict, molecule_ff_dict)
from DatabankLib.databankio import resolve_download_file_url
logger = logging.getLogger(__name__)
[docs]
def CalcAreaPerMolecule(system):
"""
Calculates average area per lipid for a simulation defined with ``system``.
It is using the ``apl.json`` file where area per lipid as a function of time
calculated by the ``calcAPL.py`` is stored.
:param system: NMRlipids databank dictionary defining a simulation.
:return: area per lipid (Å^2)
"""
APLpath = os.path.join(NMLDB_SIMU_PATH, system['path'], 'apl.json')
try:
with open(APLpath) as f:
APLdata = json.load(f)
sumAPL = 0
sumIND = 0
for i, j in APLdata.items():
sumAPL += j
sumIND += 1
APL = sumAPL/sumIND
return APL
except Exception:
print('apl.json not found from' + APLpath)
[docs]
def GetThickness(system):
"""
Gets thickness for a simulation defined with ``system`` from the ``thickness.json``
file where thickness calculated by the ``calc_thickness.py`` is stored.
:param system: NMRlipids databank dictionary defining a simulation.
:return: membrane thickess (nm) or None
"""
ThicknessPath = os.path.join(NMLDB_SIMU_PATH, system['path'], 'thickness.json')
try:
with open(ThicknessPath) as f:
thickness = json.load(f)
return thickness
except Exception:
return None
[docs]
def ShowEquilibrationTimes(system: dict):
"""
Prints relative equilibration time for each lipid within a simulation defined
by ``system``. Relative equilibration times are calculated with
``NMRPCA_timerelax.py`` and stored in ``eq_times.json`` files.
:param system: NMRlipids databank dictionary defining a simulation.
"""
EqTimesPath = os.path.join(NMLDB_SIMU_PATH, system['path'], 'eq_times.json')
try:
with open(EqTimesPath) as f:
EqTimeDict = json.load(f)
except Exception:
raise FileNotFoundError(f'eq_times.json not found for {system["ID"]}')
for i in EqTimeDict:
print(i+':', EqTimeDict[i])
[docs]
def GetNlipids(system):
"""
Returns the total number of lipids in a simulation defined by ``system``.
:param system: NMRlipids databank dictionary defining a simulation.
:return: the total number of lipids in the ``system``.
"""
Nlipid = 0
for molecule in system['COMPOSITION']:
if molecule in lipids_dict:
Nlipid += np.sum(system['COMPOSITION'][molecule]['COUNT'])
return Nlipid
[docs]
def getLipids(system, molecules=lipids_dict.keys()):
"""
Returns a string using MDAnalysis notation that can used to select all lipids from
the ``system``.
:param system: NMRlipids databank dictionary defining a simulation.
:return: a string using MDAnalysis notation that can used to select all lipids from
the ``system``.
"""
resSet = set()
for key in system['COMPOSITION'].keys():
if key in molecules:
m_file = system['COMPOSITION'][key]['MAPPING']
mapping_dict = loadMappingFile(m_file)
try:
for atom in mapping_dict:
resSet.add(mapping_dict[atom]['RESIDUE'])
except (KeyError, TypeError):
resSet.add(system['COMPOSITION'][key]['NAME'])
lipids = 'resname ' + ' or resname '.join(sorted(list(resSet)))
return lipids
[docs]
def simulation2universal_atomnames(system, molecule, atom):
"""
Get force field specific atom name corresponding to universal atom name from
the ``system``.
:param mapping_file: path for the mapping file
:param atom1: universal atom name
:return: force field specific atom name
"""
try:
mapping = loadMappingFile(system['COMPOSITION'][molecule]['MAPPING'])
except Exception:
sys.stderr.write('Mapping file was not found!\n')
return None
try:
m_atom1 = mapping[atom]["ATOMNAME"]
except (KeyError, TypeError):
sys.stderr.write(
f"{atom} was not found from {system['COMPOSITION'][molecule]['MAPPING']}!")
return None
return m_atom1
[docs]
def loadMappingFile(mapping_file):
"""
Load mapping file into a dictionary
:param: name of the mapping file
:return: mapping dictionary
"""
mapping_file_path = os.path.join(
NMLDB_ROOT_PATH, 'Scripts', 'DatabankLib', 'mapping_files', mapping_file)
mapping_dict = {}
with open(mapping_file_path, "r") as yaml_file:
mapping_dict = yaml.load(yaml_file, Loader=yaml.FullLoader)
return mapping_dict
[docs]
def getAtoms(system, lipid):
"""
Return system specific atom names of a lipid
:param system: system dictionary
:param lipid: universal lipid name
:return: string of system specific atom names
"""
atoms = ""
path_to_mapping_file = system['COMPOSITION'][lipid]['MAPPING']
mapping_dict = loadMappingFile(path_to_mapping_file)
for key in mapping_dict:
atoms = atoms + ' ' + mapping_dict[key]['ATOMNAME']
return atoms
[docs]
def getUniversalAtomName(system: dict, atomName: str, lipid: str):
"""
Returns the universal atom name corresponding the simulation specific ``atomName``
of a ``lipid`` in a simulation defined by the ``system``.
:param system: system dictionary
:param atomName: simulation specific atomname
:param lipid: universal lipid name
:return: universal atomname (string) or None
"""
try:
mappingFile = system['COMPOSITION'][lipid]['MAPPING']
except (KeyError, TypeError):
sys.stderr.write('Mapping file was not found!\n')
return None
mappingDict = loadMappingFile(mappingFile)
for universalName in mappingDict:
simName = mappingDict[universalName]['ATOMNAME']
if simName == atomName:
return universalName
sys.stderr.write('Atom was not found!\n')
return None
def calc_angle(atoms, com):
"""
:meta private:
calculates the angle between the vector and z-axis in degrees
no PBC check!
Calculates the center of mass of the selected atoms to invert bottom leaflet vector
"""
vec = atoms[1].position - atoms[0].position
d = math.sqrt(np.square(vec).sum())
cos = vec[2] / d
# values for the bottom leaflet are inverted so that
# they have the same nomenclature as the top leaflet
cos *= math.copysign(1.0, atoms[0].position[2] - com)
try:
angle = math.degrees(math.acos(cos))
except ValueError:
if abs(cos) >= 1.0:
print("Cosine is too large = {} --> truncating it to +/-1.0".format(cos))
cos = math.copysign(1.0, cos)
angle = math.degrees(math.acos(cos))
return angle
def calc_z_dim(gro):
"""
:meta private:
Returns the simulation box dimension in z-direction from coordinate file.
:param gro: coordinate in ``gro``, ``pdb`` or corresponding format.
:return: size of box z-direction.
"""
u = mda.Universe(gro)
z = u.dimensions[2]
return z
[docs]
def system2MDanalysisUniverse(system):
"""
Takes the ``system`` dictionary as an input, downloads the required files to
the NMRlipids databank directory and retuns MDAnalysis universe corressponding
the ``system``.
:param system: NMRlipids databank dictionary describing the simulation.
:return: MDAnalysis universe
"""
system_path = os.path.join(NMLDB_SIMU_PATH, system['path'])
doi = system.get('DOI')
skip_downloading: bool = (doi == 'localhost')
if skip_downloading:
print("NOTE: The system with 'localhost' DOI should be downloaded by the user.")
try:
struc, top, trj = get_struc_top_traj_fnames(system)
trj_name = os.path.join(system_path, trj)
if struc is None:
struc_name = None
else:
struc_name = os.path.join(system_path, struc)
if top is None:
top_name = None
else:
top_name = os.path.join(system_path, top)
except Exception as e:
logger.error("Error getting structure/topology/trajectory filenames.")
logger.error(str(e))
raise
# downloading trajectory (obligatory)
if (skip_downloading):
if (not os.path.isfile(trj_name)):
raise FileNotFoundError(
f"Trajectory should be downloaded [{trj_name}] by user")
else:
trj_url = resolve_download_file_url(doi, trj)
if (not os.path.isfile(trj_name)):
print('Downloading trajectory with the size of ', system['TRAJECTORY_SIZE'],
' to ', system['path'])
_ = urllib.request.urlretrieve(trj_url, trj_name)
# downloading topology (if exists)
if top is not None:
if skip_downloading:
if (not os.path.isfile(top_name)):
raise FileNotFoundError(f"TPR should be downloaded [{top_name}]")
else:
top_url = resolve_download_file_url(doi, top)
if (not os.path.isfile(top_name)):
_ = urllib.request.urlretrieve(top_url, top_name)
# downloading structure (if exists)
if struc is not None:
if skip_downloading:
if (not os.path.isfile(struc_name)):
raise FileNotFoundError(f"GRO should be downloaded [{struc_name}]")
else:
struc_url = resolve_download_file_url(doi, struc)
if (not os.path.isfile(struc_name)):
_ = urllib.request.urlretrieve(struc_url, struc_name)
made_from_top = False
try:
u = mda.Universe(top_name, trj_name)
made_from_top = True
except Exception as e:
logger.warning(f"Couldn't make Universe from {top_name} and {trj_name}.")
logger.warning(str(e))
if not made_from_top and struc is not None:
made_from_struc = False
try:
u = mda.Universe(struc_name, trj_name)
made_from_struc = True
except Exception as e:
logger.warning(f"Couldn't make Universe from {struc_name} and {trj_name}.")
logger.warning(str(e))
if not made_from_struc:
if system["SOFTWARE"].upper() == "GROMACS":
# rewrite struc_fname!
struc_fname = os.path.join(system_path, 'conf.gro')
print("Generating conf.gro because MDAnalysis cannot "
"(probably!) read tpr version")
if (
'WARNINGS' in system and
'GROMACS_VERSION' in system['WARNINGS'] and
system['WARNINGS']['GROMACS_VERSION'] == 'gromacs3'
):
os.system(f'echo System | editconf -f {top_name} -o {struc_fname}')
else:
os.system(f"echo System | gmx trjconv "
f"-s {top_name} -f {trj_name} -dump 0 -o {struc_fname}")
# the last try!
u = mda.Universe(struc_fname, trj_name)
else:
raise RuntimeError("There is no way to build up your system!")
return u
[docs]
def read_trj_PN_angles(molname: str, atom1: str, atom2: str, MDAuniverse: mda.Universe):
"""
Calculates the P-N vector angles with respect to membrane normal from the
simulation defined by the MDAnalysis universe.
:param molname: residue name of the molecule for which the P-N vector angle will
be calculated
:param atom1: name of the P atom in the simulation
:param atom2: name of the N atom in the simulation
:param MDAuniverse: MDAnalysis universe of the simulation to be analyzed
:return: tuple (angles of all molecules as a function of time,
time averages for each molecule,
the average angle over time and molecules,
the error of the mean calculated over molecules)
"""
mol = MDAuniverse
selection = mol.select_atoms(
"resname " + molname + " and (name " + atom1 + ")",
"resname " + molname + " and (name " + atom2 + ")",
).atoms.split("residue")
com = mol.select_atoms(
"resname " + molname + " and (name " + atom1 + " or name " + atom2 + ")"
).center_of_mass()
Nres = len(selection)
Nframes = len(mol.trajectory)
angles = np.zeros((Nres, Nframes))
resAverageAngles = [0] * Nres
resSTDerror = [0] * Nres
j = 0
for frame in mol.trajectory:
for i in range(0, Nres):
residue = selection[i]
angles[i, j] = calc_angle(residue, com[2])
j = j + 1
for i in range(0, Nres):
resAverageAngles[i] = sum(angles[i, :]) / Nframes
resSTDerror[i] = np.std(angles[i, :])
totalAverage = sum(resAverageAngles) / Nres
totalSTDerror = np.std(resAverageAngles) / np.sqrt(Nres)
return angles, resAverageAngles, totalAverage, totalSTDerror
# -------------------------------------- SEPARATED PART (??) ----------------------
def calc_file_sha1_hash(fi: str, step: int = 4096) -> str:
"""
:meta private:
Calculates sha1 hash of given file using hashlib
Args:
fi (str): path to file
step (int, optional): file read bytes step. Defaults to 4096.
Returns:
str: sha1 filehash of 40 char length
"""
sha1_hash = hashlib.sha1()
with open(fi, "rb") as f:
with tqdm(total=math.ceil(os.path.getsize(fi) / step)) as pbar:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(step), b""):
sha1_hash.update(byte_block)
pbar.update(1)
return sha1_hash.hexdigest()
def create_databank_directories(sim, sim_hashes, out) -> str:
"""
:meta private:
create nested output directory structure to save results
Args:
sim (_type_): Processed simulation entries
sim_hashes (_type_): file hashes needed for directory structure
out (str): output base path
Raises:
NotImplementedError: unsupported simulation software
OSError: Error while creating the output directory
Returns:
str: output directory
"""
# resolve output dir naming
if sim["SOFTWARE"] == "gromacs":
head_dir = sim_hashes.get("TPR")[0][1][0:3]
sub_dir1 = sim_hashes.get("TPR")[0][1][3:6]
sub_dir2 = sim_hashes.get("TPR")[0][1]
sub_dir3 = sim_hashes.get("TRJ")[0][1]
elif sim["SOFTWARE"] == "openMM" or sim["SOFTWARE"] == "NAMD":
head_dir = sim_hashes.get("TRJ")[0][1][0:3]
sub_dir1 = sim_hashes.get("TRJ")[0][1][3:6]
sub_dir2 = sim_hashes.get("TRJ")[0][1]
sub_dir3 = sim_hashes.get("TRJ")[0][1]
else:
raise NotImplementedError(f"sim software '{sim['SOFTWARE']}' not supported")
directory_path = os.path.join(out, head_dir, sub_dir1, sub_dir2, sub_dir3)
logger.debug(f"output_dir = {directory_path}")
# destination directory is not empty
if os.path.exists(directory_path) and os.listdir(directory_path) != 0:
logger.warning(
f"output directory '{directory_path}' is not empty. Data may be overriden."
)
# create directories
os.makedirs(directory_path, exist_ok=True)
return directory_path
class YamlBadConfigException(Exception):
"""
:meta private:
Custom Exception class for parsing the yaml configuration
"""
def __init__(self, *args, **kwargs) -> None:
Exception.__init__(self, *args, **kwargs)
def parse_valid_config_settings(info_yaml: dict) -> tuple[dict, list[str]]:
"""
:meta private:
Parses, validates and updates dict entries from yaml configuration file.
Args:
info_yaml (dict): info.yaml of database to add
Raises:
KeyError: Missing required key in info.yaml
YamlBadConfigException: Incorrect or incompatible configuration
Returns:
dict: updated sim dict
list[str]: list of filenames to download
"""
sim = copy.deepcopy(info_yaml) # mutable objects are called by reference in Python
# STEP 1 - check supported simulation software
if "SOFTWARE" not in sim:
raise KeyError("'SOFTWARE' Parameter missing in yaml")
if sim["SOFTWARE"].upper() in software_dict.keys():
logger.info(f"Simulation uses supported software '{sim['SOFTWARE'].upper()}'")
else:
raise YamlBadConfigException(
f"Simulation uses unsupported software '{sim['SOFTWARE'].upper()}'"
)
software_sim = software_dict[
sim["SOFTWARE"].upper()
] # related to dicts in this file
# STEP 2 - check required keys defined by sim software used
software_required_keys = [k for k, v in software_sim.items() if v["REQUIRED"]]
# are ALL required keys are present in sim dict and defined (not of NoneType) ?
if not all(
(k in list(sim.keys())) and (sim[k] is not None) for k in software_required_keys
):
missing_keys = [k for k in software_required_keys if k not in list(sim.keys())]
raise YamlBadConfigException(
f"Required '{sim['SOFTWARE'].upper()}' sim keys missing or "
f"not defined in conf file: {', '.join(missing_keys)}"
)
logger.debug(
f"all {len(software_required_keys)} required"
f" '{sim['SOFTWARE'].upper()}' sim keys are present"
)
# STEP 3 - check working directory
if "DIR_WRK" not in sim:
raise KeyError("'DIR_WRK' Parameter missing in yaml")
# STEP 4 - Check that all entry keys provided for each simulation are valid
files_tbd = []
# loop config entries
for key_sim, value_sim in sim.items():
logger.debug(f"processing entry: sim['{key_sim}'] = {str(value_sim)}")
if key_sim.upper() in "SOFTWARE": # skip 'SOFTWARE' entry
continue
# STEP 4.1.
# Anne: check if key is in molecules_dict, molecule_numbers_dict or
# molecule_ff_dict too
if (
(key_sim.upper() not in software_sim.keys())
and (key_sim.upper() not in molecules_dict.keys())
and (key_sim.upper() not in lipids_dict.keys())
and (key_sim.upper() not in molecule_ff_dict.keys())
):
logger.error(
f"key_sim '{key_sim}' in {sim['SOFTWARE'].lower()}_dict' "
f": {key_sim.upper() in software_sim.keys()}"
)
logger.error(
f"key_sim '{key_sim}' in molecules_dict "
f": {key_sim.upper() in molecules_dict.keys()}"
)
logger.error(
f"key_sim '{key_sim}' in lipids_dict "
f": {key_sim.upper() in lipids_dict.keys()}"
)
logger.error(
f"key_sim '{key_sim}' in molecule_ff_dict "
f": {key_sim.upper() in molecule_ff_dict.keys()}"
)
raise YamlBadConfigException(
f"'{key_sim}' not supported: Not found in "
f"'{sim['SOFTWARE'].lower()}_dict', 'molecules_dict',"
f" 'lipids_dict' and 'molecule_ff_dict'"
)
elif (
key_sim.upper() not in software_sim.keys()
): # hotfix for unkown yaml keys. TODO improve check 4.1?
logger.warning(
f"ignoring yaml entry '{key_sim}', not found "
f"in '{sim['SOFTWARE'].lower()}_dict'"
)
continue
# STEP 4.2.
# entries with files information to contain file names in arrays
if "TYPE" in software_sim[key_sim]:
if "file" in software_sim[key_sim]["TYPE"]: # entry_type
logger.debug(
f"-> found '{key_sim}:{software_sim[key_sim]}' of 'TYPE' file"
) # DEBUG
if value_sim is None:
logger.debug(f"entry '{key_sim}' has NoneType value, skipping")
# already a list -> ok
elif isinstance(value_sim, list):
logger.debug(f"value_sim '{value_sim}' is already a list, skipping")
files_tbd.extend(value_sim)
else:
value_sim_splitted = value_sim.split(";")
if len(value_sim_splitted) == 0:
raise YamlBadConfigException(
f"found no file to download for "
f"entry '{key_sim}:{software_sim[key_sim]}'"
)
# in case there are multiple files for one entry
elif len(value_sim_splitted) > 1:
files_list = []
for file_provided in value_sim.split(";"):
files_list.append([file_provided.strip()])
sim[
key_sim
] = files_list # replace ; separated string with list
else:
# print(f"value_sim_splitted = {value_sim_splitted}")
sim[key_sim] = [
[f.strip()] for f in value_sim_splitted
] # IMPORTANT: Needs to be list of lists for now
files_tbd.extend(f[0] for f in sim[key_sim])
# print(f"sim[{key_sim}] = {sim[key_sim]}")
# STEP 4.3.
# Batuhan: In conf file only one psf/tpr/pdb file allowed each
# (can coexist), multiple TRJ files are ok
# TODO true for all sim software?
# TODO add dict entry param "unique" instead?
if key_sim.upper() in ["PSF", "TPR", "PDB"] and len(sim[key_sim]) > 1:
raise YamlBadConfigException(
f"only one '{key_sim}' entry file allowed,"
f" but got {len(sim[key_sim])}: {sim[key_sim]}"
)
else:
logger.warn(
f"skipping key '{key_sim}': Not defined in software_sim library"
)
logger.info(
f"found {len(files_tbd)} ressources to download: {', '.join(files_tbd)}"
)
return sim, files_tbd
[docs]
def calcArea(system):
"""
Returns area of the calculated based on the area per lipid stored in the databank.
:param system: a system dictionary
:return: area of the system (Å^2)
"""
APL = CalcAreaPerMolecule(system)
Nlipid = 0
for molecule in system['COMPOSITION']:
if molecule in lipids_dict:
Nlipid += np.sum(system['COMPOSITION'][molecule]['COUNT'])
print(Nlipid, APL)
return Nlipid*APL/2
[docs]
def averageOrderParameters(system):
"""
Returns average order paramaters of *sn*-1 and *sn*-2 acyl chains based on universal
atom names. The names starting with M_G1C will be assigned to sn-1 and names
starting M_G2C to *sn*-2.
:parameters system: a system dictionary
:return: average of *sn*-1 and *sn*-2 order parameters
"""
path = os.path.join(NMLDB_SIMU_PATH, system['path'])
sn1sum = 0
sn1count = 0
sn2sum = 0
sn2count = 0
for lipid in system['COMPOSITION']:
if lipid in lipids_dict and 'CHOL' not in lipid:
OPpathSIM = os.path.join(path, lipid + 'OrderParameters.json')
with open(OPpathSIM) as json_file:
OPsim = json.load(json_file)
for key in OPsim:
if 'M_G1C' in key:
sn1sum += float(OPsim[key][0][0])
sn1count += 1
elif 'M_G2C' in key:
sn2sum += float(OPsim[key][0][0])
sn2count += 1
return sn1sum/sn1count, sn2sum/sn2count
[docs]
def calcLipidFraction(system, lipid):
"""
Returns the number fraction of ``lipid`` with respect to total number of lipids.
:param system: a system dictionary
:param lipid: universal molecule name of lipid
:return: number fraction of ``lipid`` with respect total number of lipids
"""
NlipidTOT = 0
for molecule in system['COMPOSITION']:
if molecule in lipids_dict:
NlipidTOT += np.sum(system['COMPOSITION'][molecule]['COUNT'])
Nlipid = 0
for molecule in system['COMPOSITION']:
if lipid in molecule:
Nlipid += np.sum(system['COMPOSITION'][molecule]['COUNT'])
return Nlipid/NlipidTOT
[docs]
def getHydrationLevel(system):
"""
Returns hydration level of the system, i.e., number of water molecules divided
by number of lipid molecules.
:param system: a system dictionary
:return: number of water molecules divided by number of lipid molecules
"""
Nlipid = 0
for molecule in system['COMPOSITION']:
if molecule in lipids_dict:
Nlipid += np.sum(system['COMPOSITION'][molecule]['COUNT'])
Nwater = system['COMPOSITION']['SOL']['COUNT']
return Nwater/Nlipid