"""
API functions for analyzing the FAIRMD Lipids Databank.
Functionality are organized into few groups:
1. Class :class:`UniverseConstructor`, which help one to create MDAnalysis.Universe from Databank's
:class:`System <fairmd.lipids.core.System>`
2. Functions that extract computed properties:
- :func:`get_OP`
- :func:`get_thickness`
- :func:`get_eqtimes`
- :func:`get_FF`
3. Functions that extract post-processed properties:
- :func:`get_mean_ApL`
- :func:`get_total_area`
4. Auxiliary functions for better interface with *MDAnalysis*
- :func:`mda_gen_selection_mols`
- :func:`mda_read_trj_tilt_angles`
"""
import json
import logging
import math
import os
import sys
import warnings
from collections.abc import Container
import MDAnalysis as mda
import numpy as np
from fairmd.lipids import FMDL_SIMU_PATH
from fairmd.lipids.core import System
from fairmd.lipids.databankio import download_resource_from_uri, resolve_file_url
from fairmd.lipids.molecules import Molecule, lipids_set
from fairmd.lipids.SchemaValidation.engines import get_struc_top_traj_fnames
logger = logging.getLogger(__name__)
[docs]
def get_thickness(system: System) -> float:
"""
Get thickness for a simulation defined with ``system`` from the ``thickness.json``.
:param system: Simulation object.
:return: membrane thickess (nm) or raise exception
"""
thickness_path = os.path.join(FMDL_SIMU_PATH, system["path"], "thickness.json")
try:
with open(thickness_path) as f:
thickness = json.load(f)
thickness_v = float(thickness)
except FileNotFoundError:
print("No thickness information for system#{}.".format(system["ID"]), file=sys.stderr)
raise
except ValueError:
print("Thickness information for system#{} is invalid.".format(system["ID"]), file=sys.stderr)
raise
else:
return thickness_v
[docs]
def get_eqtimes(system: System) -> dict:
"""
Return relative equilibration time for each lipid of ``system``.
:param system: Simulation object.
:return: dictionary of relative equilibration times for each lipid
"""
eq_times_path = os.path.join(FMDL_SIMU_PATH, system["path"], "eq_times.json")
try:
with open(eq_times_path) as f:
eq_time_dict = json.load(f)
except FileNotFoundError as e:
msg = "No equilibration time information for system#{}.".format(system["ID"])
raise FileNotFoundError(msg) from e
except json.JSONDecodeError as e:
msg = "Equilibration times information for system#{} is invalid.".format(system["ID"])
raise ValueError(msg) from e
return eq_time_dict
[docs]
def get_OP(system: System) -> dict: # noqa: N802 (API name)
"""
Return a dictionary with the order parameter data for each lipid in ``system``.
:param system: Simulation object.
:return: dictionary contaning, for each lipid, the order parameter data:
average OP, standard deviation, and standard error of mean. Contains
None if ``LipidNameOrderParameters.json`` missing.
"""
sim_op_data = {} # order parameter data for each type of lipid
for mol in system["COMPOSITION"]:
if mol not in lipids_set:
continue
fname = os.path.join(
FMDL_SIMU_PATH,
system["path"],
mol + "OrderParameters.json",
)
# it always returns dictionary but values can be empty
if not os.path.isfile(fname):
warnings.warn(f"{fname} not found for {system['ID']}", stacklevel=2)
sim_op_data[mol] = None
continue
op_data = {}
try:
with open(fname) as json_file:
op_data = json.load(json_file)
except json.JSONDecodeError as e:
msg = f"Order parameter data in {fname} is invalid for {system['ID']}"
raise ValueError(msg) from e
sim_op_data[mol] = op_data
return sim_op_data
[docs]
def get_FF(system: System) -> np.ndarray: # noqa: N802 (API name)
"""
Get numpy table of FormFactor curve.
:param system: Simulation object
:return: (q,FF,err) numpy table
"""
fn = os.path.join(
FMDL_SIMU_PATH,
system["path"],
"FormFactor.json",
)
try:
with open(fn) as json_file:
sim_ff_data = json.load(json_file)
except FileNotFoundError as e:
msg = "The form-factor data is missing for system#{}.".format(system["ID"])
raise FileNotFoundError(msg) from e
except json.JSONDecodeError as e:
msg = "The form-factor data for system#{} is invalid.".format(system["ID"])
raise ValueError(msg) from e
return np.array(sim_ff_data)
[docs]
def get_mean_ApL(system: System) -> float: # noqa: N802 (API name)
"""
Calculate average area per lipid for a system.
:param system: Simulation object.
:return: area per lipid (Å^2)
"""
path = os.path.join(FMDL_SIMU_PATH, system["path"], "apl.json")
try:
with open(path) as f:
data = json.load(f)
except FileNotFoundError as e:
msg = "Area per lipid data is absent for system #{}".format(system["ID"])
raise FileNotFoundError(msg) from e
except json.JSONDecodeError as e:
msg = "Area per lipid data for system #{} in {} is invalid.".format(system["ID"], path)
raise ValueError(msg) from e
vals = np.array(list(data.values()))
return vals.mean()
[docs]
def get_total_area(system: System) -> float:
"""
Return area of the membrane in the simulation box.
:param system: Simulation object.
:return: area of the system (Å^2)
"""
apl = get_mean_ApL(system)
return system.n_lipids * apl / 2
[docs]
class UniverseConstructError(Exception):
"""Specific error for UniverseConstructor"""
[docs]
class UniverseConstructor:
"""
Class operating with downloading and constructing Universe for the :class:`System <fairmd.lipids.core.System>`.
To use this class, one instantinate it with a particular system, and then download.
.. code-block:: python
s = systems.loc(120)
uc = UniverseConstructor(s)
uc.download_mddata()
After this, the pointer :attr:`uc.paths <paths>` will show which files are available to work with.
Finally, you can run :meth:`uc.build_universe() <build_universe>` to get the ``MDAnalysis.Universe`` object.
"""
def __init__(self, s: System) -> None:
"""
Create an empty instance. No auto-download or auto-universe.
:param s: Simulation object.
"""
self._s = s
self._paths = {
"struc": None,
"top": None,
"traj": None,
"energy": None,
}
@property
def system(self) -> System:
"""Link to simulation object."""
return self._s
@property
def paths(self) -> dict[str, str | None]:
"""Return dicts of absolute paths of downloaded files.
Allowed fields are: *struc*, *traj*, *top*, *energy*. If they are not ``None``, then
the corresponding file is downloaded and the full path is the value.
"""
return self._paths
[docs]
def download_mddata(self, *, skip_traj: bool = False) -> None:
"""
Download all the files. Previously downloaded are skipped.
:param skip_traj: Download only TOP&struc for further constructing single-frame universe
"""
gpath = os.path.join(FMDL_SIMU_PATH, self._s["path"])
struc, top, trj = get_struc_top_traj_fnames(self._s)
def _resolve_dwnld(fname: str) -> str:
fpath = os.path.join(gpath, fname)
if self._s["DOI"] == "localhost":
if not os.path.isfile(fpath):
msg = f"File {fpath} must be predownloaded for {self._s}"
raise FileNotFoundError(msg)
return fpath
if os.path.isfile(fpath):
# do not download if exists
return fpath
url = resolve_file_url(self._s["DOI"], fname)
_ = download_resource_from_uri(url, fpath)
return fpath
if struc is not None:
self._paths["struc"] = _resolve_dwnld(struc)
if top is not None:
self._paths["top"] = _resolve_dwnld(top)
if trj is not None and not skip_traj:
self._paths["traj"] = _resolve_dwnld(trj)
[docs]
def clear_mddata(self) -> None:
"""Clear downloaded MD data. For DOI=localhost, do nothing."""
if self._s["DOI"] == "localhost":
for k in self._paths:
self._paths[k] = None
return
for k, v in self._paths.items():
if v is None:
continue
print(f"Clearing {k}-file..", end="", flush=True)
os.remove(v)
print("OK")
self._paths[k] = None
[docs]
def build_universe(self) -> mda.Universe:
"""Build MDAnalysis Universe.
Replaces outdated `system2MDanalysisUniverse`.
"""
if not any(self._paths.values()):
msg = "You **MUST** run `download_mddata` before `build_universe`"
raise UniverseConstructError(msg)
if self._paths["top"] is None:
u = (
mda.Universe(self._paths["struc"])
if self._paths["traj"] is None
else mda.Universe(self._paths["struc"], self._paths["traj"])
)
else:
try:
if self._paths["traj"] is None:
u = (
mda.Universe(self._paths["top"])
if self._paths["struc"] is None
else mda.Universe(self._paths["top"], self._paths["struc"])
)
else:
u = mda.Universe(self._paths["top"], self._paths["traj"])
except OSError as e: # exception for corrupted topology file
print(
f"We got exception.. == \n{e}\n == ..and assume that TOPOLOGY is file is corrupted", file=sys.stderr
)
if self._paths["struc"] is None:
msg = "TOPOLOGY is corrupted, and no STRUCTURE is given"
raise UniverseConstructError(msg) from e
u = (
mda.Universe(self._paths["struc"])
if self._paths["traj"] is None
else mda.Universe(self._paths["struc"], self._paths["traj"])
)
# other exceptions are raised as is
return u
[docs]
def mda_gen_selection_mols(system: System, molecules: Container[Molecule] | None = None) -> str:
"""
Return a MDAnalysis selection string covering all the molecules (default None means "lipids").
:param system: FAIRMD Lipids dictionary defining a simulation.
:param molecules: container of molecule objects to be included in the selection.
:return: a string using MDAnalysis notation that can used to select all lipids from
the ``system``.
"""
res_set = set()
molecules = system.lipids.values() if molecules is None else molecules
for key, mol in system.content.items():
if mol in molecules:
try:
for atom in mol.mapping_dict:
res_set.add(mol.mapping_dict[atom]["RESIDUE"])
except (KeyError, TypeError):
res_set.add(system["COMPOSITION"][key]["NAME"])
sorted_res = sorted(res_set)
return "resname " + " or resname ".join(sorted_res)
[docs]
def mda_read_trj_tilt_angles(
resname: str,
a_name: str,
b_name: str,
universe: mda.Universe,
):
"""
Calculate the AB vector angles with respect to membrane normal from the simulation.
:param resname: residue name of the molecule for which the P-N vector angle will be calculated
:param a_name: name of the A atom in the simulation
:param b_name: name of the B atom in the simulation
:param universe: MDAnalysis universe of the simulation to be analyzed
:return: tuple (angles of all molecules as a function of time,
time averages for each molecule,
the average angle over time and molecules,
the error of the mean calculated over molecules)
"""
# Auxiliary internal function
def _calc_angle(atoms, com) -> float:
"""
:meta private:
Calculate the angle between the vector and z-axis in degrees.
No PBC check! Calculates the center of mass of the selected atoms to invert bottom leaflet vector
"""
vec = atoms[1].position - atoms[0].position
d = math.sqrt(np.square(vec).sum())
cos = vec[2] / d
# values for the bottom leaflet are inverted so that
# they have the same nomenclature as the top leaflet
cos *= math.copysign(1.0, atoms[0].position[2] - com)
try:
angle = math.degrees(math.acos(cos))
except ValueError:
if abs(cos) >= 1.0:
print(f"Cosine is too large = {cos} --> truncating it to +/-1.0")
cos = math.copysign(1.0, cos)
angle = math.degrees(math.acos(cos))
return angle
selection = universe.select_atoms(
"resname " + resname + " and (name " + a_name + ")",
"resname " + resname + " and (name " + b_name + ")",
).atoms.split("residue")
com = universe.select_atoms(
"resname " + resname + " and (name " + a_name + " or name " + b_name + ")",
).center_of_mass()
n_res = len(selection)
n_frames = len(universe.trajectory)
angles = np.zeros((n_res, n_frames))
res_aver_angles = [0] * n_res
res_std_error = [0] * n_res
j = 0
for _ in universe.trajectory:
for i in range(n_res):
residue = selection[i]
angles[i, j] = _calc_angle(residue, com[2])
j = j + 1
for i in range(n_res):
res_aver_angles[i] = sum(angles[i, :]) / n_frames
res_std_error[i] = np.std(angles[i, :])
total_average = sum(res_aver_angles) / n_res
total_std_error = np.std(res_aver_angles) / np.sqrt(n_res)
return angles, res_aver_angles, total_average, total_std_error
# -------------------------------------- SEPARATED PART (??) ----------------------