"""
Wrappers for MAICoS calculations adapted to Databank needs.
- Checks if a system is suitable for maicos calculations
- Custom JSON encoder for numpy arrays
- Custom maicos analysis classes with save methods adapted to Databank needs
"""
import contextlib
import json
import os
import re
import subprocess
from collections import deque
from logging import Logger
import maicos
import MDAnalysis as mda
import numpy as np
from maicos.core import ProfilePlanarBase
from maicos.lib.weights import density_weights
from DatabankLib.core import System
from DatabankLib.jsonEncoders import CompactJSONEncoder
from DatabankLib.settings.molecules import lipids_set
[docs]
def is_system_suitable_4_maicos(system: System) -> bool:
"""
Check if the system is suitable for maicos calculations."
:param system: Databank System object (System)
:return: False if system should be skipped
"""
if system["TYPEOFSYSTEM"] == "miscellaneous":
return False
try:
if system["WARNINGS"]["ORIENTATION"]:
print("Skipping due to ORIENTATION warning:", system["WARNINGS"]["ORIENTATION"])
return False
except (KeyError, TypeError):
pass
try:
if system["WARNINGS"]["PBC"] == "hexagonal-box":
print("Skipping due to PBC warning:", system["WARNINGS"]["PBC"])
return False
except (KeyError, TypeError):
pass
try:
if system["WARNINGS"]["NOWATER"]:
print("Skipping because there is not water in the trajectory.")
return False
except (KeyError, TypeError):
pass
return True
[docs]
def first_last_carbon(system: System, logger: Logger) -> tuple[str, str]:
"""Find last carbon of sn-1 tail and g3 carbon."""
g3_atom = ""
last_atom = ""
for molecule in system["COMPOSITION"]:
if molecule in lipids_set:
mapping = system.content[molecule].mapping_dict
# TODO: rewrite via lipid dictionary!
for nm in ["M_G3_M", "M_G13_M", "M_C32_M"]:
_ga = mapping.get(nm, {}).get("ATOMNAME")
g3_atom = _ga if _ga else g3_atom
# TODO: rewrite via lipid dictionary
for c_idx in range(4, 30):
if "M_G1C4_M" in mapping:
atom = "M_G1C" + str(c_idx) + "_M"
elif "M_G11C4_M" in mapping:
atom = "M_G11C" + str(c_idx) + "_M"
elif "M_CA4_M" in mapping:
atom = "M_CA" + str(c_idx) + "_M"
_la = mapping.get(atom, {}).get("ATOMNAME")
last_atom = _la if _la else last_atom
logger.info(f"Found last atom {last_atom} and g3 atom {g3_atom} for system {system['ID']}")
return (last_atom, g3_atom)
[docs]
def traj_centering_for_maicos(
system_path: str,
trj_name: str,
tpr_name: str,
last_atom: str,
g3_atom: str,
eq_time: int = 0,
*,
recompute: bool = False,
) -> str:
"""Center trajectory around the center of mass of all methyl carbons."""
xtccentered = os.path.join(system_path, "centered.xtc")
if os.path.isfile(xtccentered) and not recompute:
return xtccentered # already done
if recompute:
with contextlib.suppress(FileNotFoundError):
os.remove(xtccentered)
# make index
# TODO refactor to MDAnalysis
ndxpath = os.path.join(system_path, "foo.ndx")
try:
echo_input = f"a {last_atom}\nq\n".encode()
subprocess.run(["gmx", "make_ndx", "-f", tpr_name, "-o", ndxpath], input=echo_input, check=True)
except subprocess.CalledProcessError as e:
msg = f"Subprocess failed during ndx file creation: {e}"
raise RuntimeError(msg) from e
try:
with open(ndxpath) as f:
last_lines = deque(f, 1)
last_atom_id = int(re.split(r"\s+", last_lines[0].strip())[-1])
with open(ndxpath, "a") as f:
f.write("[ centralAtom ]\n")
f.write(f"{last_atom_id}\n")
except Exception as e:
msg = f"Some error occurred while reading the foo.ndx {ndxpath}"
raise RuntimeError(msg) from e
# start preparing centered trajectory
xtcwhole = os.path.join(system_path, "whole.xtc")
print("Make molecules whole in the trajectory")
with contextlib.suppress(FileNotFoundError):
os.remove(xtcwhole)
try:
echo_proc = b"System\n"
subprocess.run(
[
"gmx",
"trjconv",
"-f",
trj_name,
"-s",
tpr_name,
"-o",
xtcwhole,
"-pbc",
"mol",
"-b",
str(eq_time),
],
input=echo_proc,
check=True,
)
except subprocess.CalledProcessError as e:
msg = "trjconv for whole.xtc failed"
raise RuntimeError(msg) from e
# centering irt methyl-groups
xtcfoo = os.path.join(system_path, "foo2.xtc")
with contextlib.suppress(FileNotFoundError):
os.remove(xtcfoo)
try:
echo_input = b"centralAtom\nSystem"
subprocess.run(
[
"gmx",
"trjconv",
"-center",
"-pbc",
"mol",
"-n",
ndxpath,
"-f",
xtcwhole,
"-s",
tpr_name,
"-o",
xtcfoo,
],
input=echo_input,
check=True,
)
except subprocess.CalledProcessError as e:
msg = f"trjconv for center failed: {e}"
raise RuntimeError(msg) from e
try:
os.remove(ndxpath)
os.remove(xtcwhole)
except OSError as e:
msg = f"Failed to remove temporary files: {e}"
raise RuntimeError(msg) from e
# Center around the center of mass of all the g_3 carbons
try:
echo_input = f"a {g3_atom}\nq\n".encode()
subprocess.run(["gmx", "make_ndx", "-f", tpr_name, "-o", ndxpath], input=echo_input, check=True)
echo_input = f"{g3_atom}\nSystem".encode()
subprocess.run(
[
"gmx",
"trjconv",
"-center",
"-pbc",
"mol",
"-n",
ndxpath,
"-f",
xtcfoo,
"-s",
tpr_name,
"-o",
xtccentered,
],
input=echo_input,
check=True,
)
except subprocess.CalledProcessError as e:
msg = "Failed during centering on g3 carbons."
raise RuntimeError(msg) from e
try:
os.remove(xtcfoo)
os.remove(ndxpath)
except OSError as e:
msg = f"A error occurred during removing temporary files {ndxpath} & {xtcfoo}."
raise RuntimeError(msg) from e
return xtccentered
[docs]
class NumpyArrayEncoder(CompactJSONEncoder):
"""Encoder for 2xN numpy arrays to be used with json.dump."""
[docs]
def encode(self, o) -> str: # noqa: ANN001 (o: Any)
"""Encode numpy arrays as lists."""
if isinstance(o, np.ndarray):
return CompactJSONEncoder.encode(self, o.tolist())
return CompactJSONEncoder.encode(self, o)
[docs]
class DensityPlanar(maicos.DensityPlanar):
"""Density profiler for planar system."""
[docs]
def save(self) -> None:
"""Save performing unit conversion from Å to nm and e/Å^3 to e/nm^3"""
outdata = np.vstack(
[
self.results.bin_pos / 10,
self.results.profile * 1e3,
self.results.dprofile * 1e3,
],
).T
with open(self.output, "w") as f:
json.dump(outdata, f, cls=NumpyArrayEncoder)
[docs]
class DielectricPlanar(maicos.DielectricPlanar):
"""Dielectric profile for planar system."""
[docs]
def save(self) -> None:
"""Save performing unit conversion from Å to nm for the distance."""
outdata_perp = np.vstack(
[
self.results.bin_pos / 10, # Convert from Å to nm
self.results.eps_perp,
self.results.deps_perp,
],
).T
with open(f"{self.output_prefix}_perp.json", "w") as f:
json.dump(outdata_perp, f, cls=NumpyArrayEncoder)
outdata_par = np.vstack(
[
self.results.bin_pos / 10, # Convert from Å to nm
self.results.eps_par,
self.results.deps_par,
],
).T
with open(f"{self.output_prefix}_par.json", "w") as f:
json.dump(outdata_par, f, cls=NumpyArrayEncoder)
[docs]
class DiporderPlanar(maicos.DiporderPlanar):
"""Dipole order parameter profile for planar system."""
[docs]
def save(self) -> None:
"""Save performing unit conversion from Å to nm for the distance."""
outdata = np.vstack(
[
self.results.bin_pos / 10, # Convert from Å to nm
self.results.profile,
self.results.dprofile,
],
).T
with open(self.output, "w") as f:
json.dump(outdata, f, cls=NumpyArrayEncoder)