"""
:module: DatabankLib.analyze
:description: major analysis methods calculating a property from NMRLipipds
`system` object
"""
import json
import os
import re
import traceback
from logging import Logger
import subprocess
import buildh
import urllib.request
import socket
from collections import deque
from tqdm import tqdm
import numpy as np
import gc
import MDAnalysis as mda
from maicos.core.base import AnalysisCollection
from DatabankLib import (
NMLDB_SIMU_PATH,
RCODE_ERROR,
RCODE_SKIPPED,
RCODE_COMPUTED,
NMLDB_DATA_PATH,
)
from DatabankLib.core import System
from DatabankLib.settings.molecules import lipids_set
from DatabankLib.settings.engines import get_struc_top_traj_fnames
from DatabankLib.databankLibrary import GetNlipids, system2MDanalysisUniverse
from DatabankLib.jsonEncoders import CompactJSONEncoder
from DatabankLib.databankio import resolve_download_file_url
from DatabankLib.databankop import find_OP
from DatabankLib import analyze_nmrpca as nmrpca
from DatabankLib.maicos import (
FormFactorPlanar,
DielectricPlanar,
DensityPlanar,
DiporderPlanar,
)
[docs]
def computeNMRPCA( # noqa: N802 (API)
system: System, logger: Logger, recompute: bool = False
) -> int:
"""Compute eq_times.json using NMR PCA analysis.
Args:
system (System): one of systems of the Databank
recompute (bool, optional): Delete previous apl.json and recompute it if True.
Defaults to False.
Returns:
int success code (RCODE_...)
"""
print("== Doi: " + system["DOI"])
print("== System name: " + system["SYSTEM"])
# getting data from databank and preprocessing them
# Start Parser
# TODO: 2test| parser = Parser(NMLDB_SIMU_PATH, readme, eq_time_fname, testTraj)
try:
parser = nmrpca.Parser(system, "eq_times.json")
# Check trajectory
print(parser._path)
vpcode = parser.validate_path()
print("ValidatePath code: ", vpcode)
except Exception as e:
logger.error("Error initializing NMRPCA parser.")
logger.error(str(e))
logger.error(traceback.format_exc())
return RCODE_ERROR
if vpcode > 0:
logger.error("Some errors in analyze_nmrpca.py::Parser constructor.\n")
return RCODE_ERROR
elif vpcode < 0 and not recompute:
return RCODE_SKIPPED
if (
"WARNINGS" in system
and system["WARNINGS"] is not None
and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"]
):
return RCODE_SKIPPED
if (
"WARNINGS" in system
and system["WARNINGS"] is not None
and "SKIP_EQTIMES" in system["WARNINGS"]
):
return RCODE_SKIPPED
# Check if TPR is defined and non-null or ""
try:
__ = system["TPR"][0][0]
_ = __[0]
except (KeyError, TypeError):
logger.error(f"TPR is required for NMRPCA analysis ({system['ID']})!")
return RCODE_ERROR
try:
# Download files
parser.download_traj()
# Prepare trajectory
parser.prepare_traj()
# Concatenate trajectory
parser.concatenate_traj()
equilibration_times = {}
# Iterate over trajectories for different lipids
for lip, traj in parser.concatenated_trajs.items():
print(f"Main: parsing lipid {lip}")
# Creat PCA for trajectory
pca_runner = nmrpca.PCA(traj[0], traj[1], traj[2], traj[3], parser.trj_len)
# Run PCA
data = pca_runner.PCA()
print("Main: PCA: done")
# Project trajectory on PC1
pca_runner.get_proj(data)
print("Main: Projections: done")
# Calculate autocorrelations
pca_runner.get_autocorrelations()
print("Main: Autocorrelations: done")
# Estimate equilibration time
te2 = nmrpca.TimeEstimator(pca_runner.autocorrelation).calculate_time()
equilibration_times[lip] = te2 / parser.trj_len
print("Main: EQ time: done")
print(te2 / parser.trj_len)
parser.dump_data(equilibration_times)
gc.collect()
except Exception as e:
logger.error('Calculation failed for ' + system['path'])
logger.error(str(e))
logger.error(traceback.format_exc())
return RCODE_ERROR
else:
return RCODE_COMPUTED
[docs]
def computeAPL( # noqa: N802 (API)
system: System, logger: Logger, recompute: bool = False
) -> int:
"""Generate apl.json analysis file for a system.
Args:
system (dict): one of systems of the Databank
recompute (bool, optional): Delete previous apl.json and recompute it if True.
Defaults to False.
Returns:
int success code (RCODE_...)
"""
# TODO: reading software and file path for simulations
# software = system['SOFTWARE']
path = system["path"]
# this is checking if area per lipid is already calculated for the systems
outfilename = os.path.join(NMLDB_SIMU_PATH, path, "apl.json")
if os.path.isfile(outfilename):
if recompute:
os.unlink(outfilename)
else:
return RCODE_SKIPPED
print("Analyzing: ", path)
print("Will write into: ", outfilename)
try:
# calculates the total number of lipids
n_lipid = GetNlipids(system)
# makes MDAnalysis universe from the system. This also downloads the data if not
# yet locally available
u = system2MDanalysisUniverse(system)
if u is None:
print("Generation of MDAnalysis universe failed in folder", path)
return RCODE_ERROR
# this calculates the area per lipid as a function of time and stores it
# in the databank
apl = {}
for ts in tqdm(u.trajectory, desc="Scanning the trajectory"):
if u.trajectory.time >= system["TIMELEFTOUT"] * 1000:
dims = u.dimensions
apl_frame = dims[0] * dims[1] * 2 / n_lipid
apl[u.trajectory.time] = apl_frame
with open(outfilename, "w") as f:
json.dump(apl, f, cls=CompactJSONEncoder)
except Exception as e:
logger.error('Calculation failed for ' + system['path'])
logger.error(str(e))
logger.error(traceback.format_exc())
return RCODE_ERROR
else:
return RCODE_COMPUTED
[docs]
def computeTH( # noqa: N802 (API)
system: dict, logger: Logger, recompute: bool = False
) -> int:
thick_fn = os.path.join(NMLDB_SIMU_PATH, system["path"], "thickness.json")
if os.path.isfile(thick_fn) and not recompute:
# file exist. Skipping
return RCODE_SKIPPED
wat_dens_name = os.path.join(NMLDB_SIMU_PATH, system["path"], "WaterDensity.json")
lip_dens_name = os.path.join(NMLDB_SIMU_PATH, system["path"], "LipidDensity.json")
print(lip_dens_name)
try:
with open(wat_dens_name) as f:
wat_dens = json.load(f)
with open(lip_dens_name) as f:
lip_dens = json.load(f)
wd = np.array(wat_dens)
ld = np.array(lip_dens)
idx = np.argwhere(np.diff(np.sign(wd[:, 1] - ld[:, 1]))).flatten()
if len(idx) < 2:
print(
"Dehydrated sample! Standard thickness rule doesn't work."
" Will extract boxZ."
)
thickness = wd[-1, 0] - wd[0, 0]
else:
thickness = wd[idx[1], 0] - wd[idx[0], 0]
with open(thick_fn, "w") as f:
json.dump(thickness, f)
except Exception as e:
logger.error("Calculation failed for " + system["path"])
logger.error(str(e))
logger.error(traceback.format_exc())
return RCODE_ERROR
return RCODE_COMPUTED
# TODO: implement onlyLipid
[docs]
def computeOP( # noqa: N802 (API)
system: System, logger: Logger, recompute: bool = False
) -> int:
"""_summary_
Args:
system (dict): _description_
recompute (bool, optional): _description_. Defaults to False.
Returns:
int: _description_
"""
path = system["path"]
# Check if order parameters are calculated or something in the system prevents
# order parameter calculation
for key in system["COMPOSITION"]:
outfilename = os.path.join(NMLDB_SIMU_PATH, path, key + "OrderParameters.json")
if os.path.isfile(outfilename) or (
"WARNINGS" in system.keys()
and type(system["WARNINGS"]) is dict
and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"].keys()
and key in system["WARNINGS"]["AMBIGUOUS_ATOMNAMES"]
):
file_found = True
elif key in lipids_set:
file_found = False
break
if file_found and not recompute:
return RCODE_SKIPPED
# If order parameters are not calculated and system ok, then continue to
# calculating order parameters
print("Analyzing: ", path)
# Download the simulations and create MDAnalysis universe (the universe is not used
# here but this script downloads the data)
_ = system2MDanalysisUniverse(system)
# Software and time for equilibration period
software = system["SOFTWARE"]
eq_time = float(system["TIMELEFTOUT"]) * 1000
# Check if all or united atom simulation
try:
united_atom = system["UNITEDATOM_DICT"]
except KeyError:
united_atom = False
# Check relevant warnings
g3switch = (
"WARNINGS" in system
and type(system["WARNINGS"]) is dict
and "GROMACS_VERSION" in system["WARNINGS"]
and system["WARNINGS"]["GROMACS_VERSION"] == "gromacs3"
)
cur_path = os.path.join(NMLDB_SIMU_PATH, path)
try:
struc_fname, top_fname, trj_fname = get_struc_top_traj_fnames(
system, join_path=cur_path
)
except (ValueError, KeyError) as e:
logger.error("Error reading filenames from system dictionary.\n")
logger.error(str(type(e)) + " => " + str(e))
return RCODE_ERROR
try:
# Set topology file names and make Gromacs trajectories whole
if "gromacs" in software:
if top_fname is None:
raise ValueError("TPR is required for OP calculations!")
xtcwhole = os.path.join(NMLDB_SIMU_PATH, path, "whole.xtc")
if not os.path.isfile(xtcwhole):
try:
echo_proc = "System\n".encode('utf-8')
if g3switch:
cmd_args = ['trjconv', '-f', trj_fname, '-s', top_fname, '-o', xtcwhole, '-pbc', 'mol', '-b', str(eq_time)]
else:
cmd_args = ['gmx', 'trjconv', '-f', trj_fname, '-s', top_fname, '-o', xtcwhole, '-pbc', 'mol', '-b', str(eq_time)]
if united_atom and system["TRAJECTORY_SIZE"] > 15e9:
cmd_args.extend(['-skip', '3'])
subprocess.run(cmd_args, input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("trjconv exited with error (see above)") from e
elif "openMM" in software or "NAMD" in software:
if not os.path.isfile(struc_fname):
pdb_url = resolve_download_file_url(system.get("DOI"), struc_fname)
_ = urllib.request.urlretrieve(pdb_url, struc_fname)
else:
print(
"Order parameter calculation for other than gromacs, "
"openMM and NAMD are yet to be implemented."
)
return RCODE_ERROR
# Calculate order parameters
# -----------------------------------
# united-atom switch -> engine switch
echo_proc = "System\n".encode('utf-8')
if united_atom:
if "gromacs" not in software:
raise ValueError("UNITED_ATOMS is supported only for GROMACS engine!")
frame0struc = os.path.join(NMLDB_SIMU_PATH, path, "frame0.gro")
if g3switch:
try:
subprocess.run(['editconf', '-f', top_fname, '-o', frame0struc], input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("editconf exited with error (see above)") from e
else:
try:
if g3switch:
subprocess.run(['trjconv', '-f', xtcwhole, '-s', top_fname, '-dump', '0', '-o', frame0struc], input=echo_proc, check=True)
else:
subprocess.run(['gmx','trjconv', '-f', xtcwhole, '-s', top_fname, '-dump', '0', '-o', frame0struc], input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"trjconv ({'trjconv' if g3switch else 'gmx trjconv'}) exited with error (see above)"
) from e
for key in system["UNITEDATOM_DICT"]:
# construct order parameter definition file for CH bonds from
# mapping file
mapping_dict = system.content[key].mapping_dict
def_fname = os.path.join(NMLDB_SIMU_PATH, path, key + ".def")
with open(def_fname, "w") as def_file:
previous_line = ""
regexp1_H = re.compile(r"M_[A-Z0-9]*C[0-9]*H[0-9]*_M") # noqa: N806
regexp2_H = re.compile(r"M_G[0-9]*H[0-9]*_M") # noqa: N806
regexp1_C = re.compile(r"M_[A-Z0-9]*C[0-9]*_M") # noqa: N806
regexp2_C = re.compile(r"M_G[0-9]_M") # noqa: N806
# Note that mapping_dict's keys must be ordered
# C11 H111 H112 C12 H121 H122 ...
# otherwize algorithm will fail
for mapping_key in mapping_dict:
if (regexp1_C.search(mapping_key) or
regexp2_C.search(mapping_key)):
atom_c = [mapping_key,
mapping_dict[mapping_key]["ATOMNAME"]]
atom_h = []
elif (regexp1_H.search(mapping_key) or
regexp2_H.search(mapping_key)):
atom_h = [mapping_key,
mapping_dict[mapping_key]["ATOMNAME"]]
else:
atom_c = []
atom_h = []
if atom_h:
assert atom_c
items = [atom_c[1], atom_h[1], atom_c[0], atom_h[0]]
def_line = (
items[2]
+ "&"
+ items[3]
+ " "
+ key
+ " "
+ items[0]
+ " "
+ items[1]
+ "\n"
)
if def_line != previous_line:
def_file.write(def_line)
previous_line = def_line
# Add hydrogens to trajectory and calculate order parameters with buildH
op_filepath = os.path.join(
NMLDB_SIMU_PATH, path, key + "OrderParameters.dat"
)
lipid_json_file = [
os.path.join(
NMLDB_DATA_PATH,
"lipid_json_buildH",
system["UNITEDATOM_DICT"][key] + ".json",
)
]
if not os.path.isfile(lipid_json_file[0]):
lipid_json_file = None
print(system["UNITEDATOM_DICT"][key])
buildh.launch(
coord_file=frame0struc,
def_file=def_fname,
lipid_type=system["UNITEDATOM_DICT"][key],
lipid_jsons=lipid_json_file,
traj_file=xtcwhole,
out_file=f"{op_filepath}.buildH",
ignore_CH3s=True,
)
with open(op_filepath + ".buildH") as op_file:
bh_lines = op_file.readlines()
op_lines = []
data = {}
for line in bh_lines:
if "#" in line:
continue
line2 = (
line.split()[0].replace("&", " ")
+ " "
+ line.split()[4]
+ " "
+ line.split()[5]
+ " "
+ line.split()[6]
+ "\n"
)
op_lines.append(line2)
op_name = line.split()[0].replace("&", " ")
# -- line.split()[0] + " " + line.split()[1]
op_values = [
float(line.split()[4]),
float(line.split()[5]),
float(line.split()[6]),
]
data[str(op_name)] = []
data[str(op_name)].append(op_values)
# write ascii file (TODO: DO WE NEED IT?)
with open(op_filepath, "w") as outfile:
outfile.write("Atom Average OP OP stem\n")
outfile.writelines(op_lines)
# write json
outfile2 = os.path.join(
NMLDB_SIMU_PATH, path, key + "OrderParameters.json"
)
with open(outfile2, "w") as f:
json.dump(data, f, cls=CompactJSONEncoder)
# not united-atom cases
else:
if "gromacs" in software:
gro = os.path.join(NMLDB_SIMU_PATH, path, "conf.gro")
print("\n Making gro file")
if g3switch:
try:
subprocess.run(['editconf', '-f', top_fname, '-o', gro], input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("editconf exited with error (see above)") from e
else:
try:
subprocess.run(['gmx', 'trjconv', '-f', trj_fname, '-s', top_fname, '-dump', '0', '-o', gro], input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("trjconv exited with error (see above)") from e
for key in system["COMPOSITION"]:
if (
"WARNINGS" in system.keys()
and system["WARNINGS"] is not None
and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"].keys()
and key in system["WARNINGS"]["AMBIGUOUS_ATOMNAMES"]
):
print(path, key)
print(
"Order parameters cannot be calculated if atom "
"names are ambiguous."
)
continue
if key in lipids_set:
print("Calculating ", key, " order parameters")
resname = system["COMPOSITION"][key]["NAME"]
outfilename = os.path.join(
NMLDB_SIMU_PATH, path, key + "OrderParameters.dat"
)
outfilename2 = os.path.join(
NMLDB_SIMU_PATH, path, key + "OrderParameters.json"
)
if os.path.isfile(outfilename2):
print("Order parameter file already found")
continue
if "gromacs" in software:
try:
op_obj = find_OP(
system.content[key].mapping_dict,
top_fname,
xtcwhole,
resname,
)
except Exception as e:
logger.warning(f"We got this exception: \n {e}")
logger.warning(
"But we will try rebuild the Universe "
"from GROM if using tpr did not work!"
)
op_obj = find_OP(
system.content[key].mapping_dict, gro, xtcwhole, resname
)
if "openMM" in software or "NAMD" in software:
op_obj = find_OP(
system.content[key].mapping_dict,
struc_fname,
trj_fname,
resname,
)
data = {}
with open(outfilename, "w") as outfile:
outfile.write("Atom Average OP OP stem\n")
for i, op in enumerate(op_obj):
(op.avg, op.std, op.stem) = op.get_avg_std_stem_OP
outfile.write(f"{op.name} {str(op.avg)} {str(op.stem)}\n")
data[str(op.name)] = []
data[str(op.name)].append(op.get_avg_std_stem_OP)
with open(outfilename2, "w") as f:
json.dump(data, f, cls=CompactJSONEncoder)
print("Order parameters calculated and saved to ", path)
except Exception as e:
print("Calculation failed for " + system["path"])
print(str(e))
print(traceback.format_exc())
return RCODE_ERROR
return RCODE_COMPUTED
[docs]
def computeMAICOS( # noqa: N802 (API)
system: System, logger: Logger, recompute: bool = False
) -> int:
logger.info("System title: " + system['SYSTEM'])
logger.info("System path: " + system['path'])
software = system['SOFTWARE']
# download trajectory and gro files
system_path = os.path.join(NMLDB_SIMU_PATH, system['path'])
doi = system.get('DOI')
skip_downloading: bool = (doi == 'localhost')
if skip_downloading:
print("NOTE: The system with 'localhost' DOI should be downloaded by the user.")
set_maicos_files = {
"LipidMassDensity.json",
"DiporderWater.json",
"FormFactor.json",
"TotalChargeDensity.json",
"WaterChargeDensity.json",
"Diporder2Water.json",
"TotalMassDensity.json",
"TotalDensity.json",
"WaterMassDensity.json",
"LipidDensity.json",
"WaterDensity.json",
"LipidChargeDensity.json",
"DielectricLipid", # Dielectric* files are treated
"DielectricWater", # slightly differently (see below)
"DielectricTotal"
}
for file in set_maicos_files.copy():
if "Dielectric" in file:
if (
os.path.isfile(os.path.join(system_path, file + '_par.json')) and
os.path.isfile(os.path.join(system_path, file + '_perp.json')) and
not recompute
):
set_maicos_files.remove(file)
else:
if os.path.isfile(os.path.join(system_path, file)) and not recompute:
set_maicos_files.remove(file)
logger.info("Files to be computed: " + "|".join(set_maicos_files))
if not set_maicos_files:
return RCODE_SKIPPED
if system['TYPEOFSYSTEM'] == 'miscellaneous':
return RCODE_SKIPPED
print('Analyzing', system_path)
try:
if system['UNITEDATOM_DICT']:
print('United atom simulation')
except KeyError:
pass
try:
if system['WARNINGS']['ORIENTATION']:
print('Skipping due to ORIENTATION warning:',
system['WARNINGS']['ORIENTATION'])
return RCODE_SKIPPED
except (KeyError, TypeError):
pass
try:
if system['WARNINGS']['PBC'] == 'hexagonal-box':
print('Skipping due to PBC warning:', system['WARNINGS']['PBC'])
return RCODE_SKIPPED
except (KeyError, TypeError):
pass
try:
if system['WARNINGS']['NOWATER']:
print('Skipping because there is not water in the trajectory.')
return RCODE_SKIPPED
except (KeyError, TypeError):
pass
try:
struc, top, trj = get_struc_top_traj_fnames(system)
trj_name = os.path.join(system_path, trj)
if struc is None:
struc_name = None
else:
struc_name = os.path.join(system_path, struc)
if top is None:
tpr_name = None
else:
top_name = os.path.join(system_path, top)
except Exception as e:
logger.error("Error getting structure/topology/trajectory filenames.")
logger.error(str(e))
return RCODE_ERROR
socket.setdefaulttimeout(15)
try:
if skip_downloading:
if not os.path.isfile(trj_name):
raise FileNotFoundError(
f"Trajectory should be downloaded [{trj_name}] by user")
else:
trj_url = resolve_download_file_url(system['DOI'], trj)
if not os.path.isfile(trj_name):
print('Downloading trajectory with the size of ',
system['TRAJECTORY_SIZE'], ' to ', system['path'])
_ = urllib.request.urlretrieve(trj_url, trj_name)
# make a function like this
# TODO TPR should not be obligatory for GROMACS
if 'gromacs' in software:
tpr_name = top_name
if skip_downloading:
if not os.path.isfile(tpr_name):
raise FileNotFoundError(
f"TPR should be downloaded [{tpr_name}] by user")
else:
tpr_url = resolve_download_file_url(doi, top)
if not os.path.isfile(tpr_name):
_ = urllib.request.urlretrieve(tpr_url, tpr_name)
if 'openMM' in software or 'NAMD' in software:
if skip_downloading:
if (not os.path.isfile(struc_name)):
raise FileNotFoundError(
f"Structure file should be downloaded [{struc_name}] by user")
else:
pdb_url = resolve_download_file_url(doi, struc)
if not os.path.isfile(struc_name):
_ = urllib.request.urlretrieve(pdb_url, struc_name)
eq_time = float(system['TIMELEFTOUT'])*1000
# FIND LAST CARBON OF SN-1 TAIL AND G3 CARBON
for molecule in system['COMPOSITION']:
if molecule in lipids_set:
mapping = system.content[molecule].mapping_dict
# TODO: rewrite via lipid dictionary!
for nm in ["M_G3_M", "M_G13_M", "M_C32_M"]:
try:
g3_atom = mapping[nm]['ATOMNAME']
continue
except (KeyError, TypeError):
pass
# TODO: rewrite via lipid dictionary
if "M_G1C4_M" in mapping.keys():
for c_idx in range(4, 30):
atom = 'M_G1C' + str(c_idx) + '_M'
try:
last_atom = mapping[atom]['ATOMNAME']
except (KeyError, TypeError):
continue
elif "M_G11C4_M" in mapping.keys():
for c_idx in range(4, 30):
atom = 'M_G11C' + str(c_idx) + '_M'
try:
last_atom = mapping[atom]['ATOMNAME']
except (KeyError, TypeError):
continue
elif "M_CA4_M" in mapping.keys():
for c_idx in range(4, 30):
atom = 'M_CA' + str(c_idx) + '_M'
try:
last_atom = mapping[atom]['ATOMNAME']
except (KeyError, TypeError):
continue
print(last_atom, g3_atom)
# Center around one lipid tail CH3 to guarantee all lipids in the same box
if 'gromacs' in system['SOFTWARE']:
ndxpath = os.path.join(system_path, 'foo.ndx')
try:
echo_input = f"a {last_atom}\nq\n".encode('utf-8')
subprocess.run(['gmx', 'make_ndx', '-f', tpr_name, '-o', ndxpath], input=echo_input, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Subprocess failed during ndx file creation: {e}") from e
try:
with open(ndxpath, 'r') as f:
last_lines = deque(f,1)
last_atom_id = int(re.split(r'\s+', last_lines[0].strip())[-1])
with open(ndxpath, 'a') as f:
f.write('[ centralAtom ]\n')
f.write(f'{last_atom_id}\n')
except Exception as e:
raise RuntimeError(f"Some error occurred while reading the foo.ndx {ndxpath}: {e}") from e
xtcwhole = os.path.join(system_path, 'whole.xtc')
xtcfoo = os.path.join(system_path, 'foo2.xtc')
xtccentered = os.path.join(system_path, 'centered.xtc')
if (not os.path.isfile(xtccentered)):
print("Make molecules whole in the trajectory")
if not os.path.isfile(xtcwhole):
try:
echo_proc = "System\n".encode('utf-8')
subprocess.run([
'gmx','trjconv',
'-f', trj_name,
'-s', tpr_name,
'-o', xtcwhole,
'-pbc', 'mol',
'-b', str(eq_time)], input=echo_proc, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"trjconv for whole failed: {e}") from e
if (not os.path.isfile(xtcfoo)):
try:
echo_input = "centralAtom\nSystem".encode('utf-8')
subprocess.run([
'gmx','trjconv',
'-center',
'-pbc', 'mol',
'-n', ndxpath,
'-f', xtcwhole,
'-s', tpr_name,
'-o', xtcfoo], input=echo_input, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"trjconv for center failed: {e}") from e
try:
os.remove(ndxpath)
os.remove(xtcwhole)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to remove temporary files: {e}") from e
# Center around the center of mass of all the g_3 carbons
try:
echo_input = f"a {g3_atom}\nq\n".encode('utf-8')
subprocess.run(['gmx','make_ndx', '-f', tpr_name, '-o', ndxpath], input=echo_input, check=True)
echo_input = f"{g3_atom}\nSystem".encode('utf-8')
subprocess.run(['gmx','trjconv', '-center', '-pbc', 'mol', '-n', ndxpath, '-f', xtcfoo, '-s', tpr_name, '-o', xtccentered], input=echo_input, check=True)
os.remove(xtcfoo)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed during centering on g3 carbons: {e}") from e
try:
os.remove(ndxpath)
except Exception as e:
raise RuntimeError(f"Some error occurred during removing the {ndxpath}:{e}")
else:
print("Centering for other than Gromacs may not work if there are"
" jumps over periodic boundary conditions in z-direction.")
if 'gromacs' in system['SOFTWARE']:
u = mda.Universe(tpr_name, xtccentered)
if 'openMM' in system['SOFTWARE'] or 'NAMD' in system['SOFTWARE']:
u = mda.Universe(struc_name, trj_name)
# -- PHILIP code starts here --
# We us a hardoced bin width
bin_width = 0.3
# introduce elements attribute (if it's empty)
# and make initial guess (just in case)
u.guess_TopologyAttrs(force_guess=["elements"])
from DatabankLib.settings import elements
elements.guess_elements(system, u)
# Adjust the group selection to be general for analysis
from DatabankLib.databankLibrary import getLipids
water = u.select_atoms(f"resname {system['COMPOSITION']['SOL']['NAME']}")
lipid = u.select_atoms(getLipids(system))
# Maybe add group for ions and compute densities for them as well?
# fixed `zmin` and `zmax` for profiles are deduced from smallest box dimension
L_min = u.dimensions[2] # noqa: N806 (PEP8)
for ts in u.trajectory:
if ts.dimensions[2] < L_min:
L_min = ts.dimensions[2]
zmin = -L_min / 2
zmax = L_min / 2
# Skip unwrap/pack for speed - trajectories are already centered and whole
base_options = {"unwrap": False, "bin_width": bin_width, "pack": False}
zlim = {"zmin": zmin, "zmax": zmax}
dens_options = {**zlim, **base_options}
spath = os.path.join(NMLDB_SIMU_PATH, system['path'])
request_analysis = dict()
if "TotalDensity.json" in set_maicos_files:
# Density profiles
dens_e_total = DensityPlanar(
u.atoms,
dens="electron",
**dens_options,
output=os.path.join(spath, "TotalDensity.json"),
)
request_analysis["TotalDensity.json"] = dens_e_total
if "WaterDensity.json" in set_maicos_files:
dens_e_water = DensityPlanar(
water,
dens="electron",
**dens_options,
output=os.path.join(spath, "WaterDensity.json"),
)
request_analysis["WaterDensity.json"] = dens_e_water
if "LipidDensity.json" in set_maicos_files:
dens_e_lipid = DensityPlanar(
lipid,
dens="electron",
**dens_options,
output=os.path.join(spath, "LipidDensity.json"),
)
request_analysis["LipidDensity.json"] = dens_e_lipid
if "TotalMassDensity.json" in set_maicos_files:
dens_m_total = DensityPlanar(
u.atoms,
dens="mass",
**dens_options,
output=os.path.join(spath, "TotalMassDensity.json"),
)
request_analysis["TotalMassDensity.json"] = dens_m_total
if "WaterMassDensity.json" in set_maicos_files:
dens_m_water = DensityPlanar(
water,
dens="mass",
**dens_options,
output=os.path.join(spath, "WaterMassDensity.json"),
)
request_analysis["WaterMassDensity.json"] = dens_m_water
if "LipidMassDensity.json" in set_maicos_files:
dens_m_lipid = DensityPlanar(
lipid,
dens="mass",
**dens_options,
output=os.path.join(spath, "LipidMassDensity.json"),
)
request_analysis["LipidMassDensity.json"] = dens_m_lipid
if "TotalChargeDensity.json" in set_maicos_files:
dens_c_total = DensityPlanar(
u.atoms,
dens="charge",
**dens_options,
output=os.path.join(spath, "TotalChargeDensity.json"),
)
request_analysis["TotalChargeDensity.json"] = dens_c_total
if "WaterChargeDensity.json" in set_maicos_files:
dens_c_water = DensityPlanar(
water,
dens="charge",
**dens_options,
output=os.path.join(spath, "WaterChargeDensity.json"),
)
request_analysis["WaterChargeDensity.json"] = dens_c_water
if "LipidChargeDensity.json" in set_maicos_files:
dens_c_lipid = DensityPlanar(
lipid,
dens="charge",
**dens_options,
output=os.path.join(spath, "LipidChargeDensity.json"),
)
request_analysis["LipidChargeDensity.json"] = dens_c_lipid
# Form factor
# Use `None` for `zmin`/`zmax` to respect changing cell size (vs fixed
# zmin/zmax for density profiles)
if "FormFactor.json" in set_maicos_files:
form_factor = FormFactorPlanar(
atomgroup=u.atoms,
**base_options,
zmin=None,
zmax=None,
output=os.path.join(spath, "FormFactor.json"),
)
request_analysis["FormFactor.json"] = form_factor
# Water Orientation
# TODO: Maybe also compute orientation for lipid heads/tails?
if "DiporderWater.json" in set_maicos_files:
cos_water = DiporderPlanar(
water,
order_parameter="cos_theta",
**dens_options,
output=os.path.join(spath, "DiporderWater.json"),
)
request_analysis["DiporderWater.json"] = cos_water
if "Diporder2Water.json" in set_maicos_files:
cos2_water = DiporderPlanar(
water,
order_parameter="cos_2_theta",
**dens_options,
output=os.path.join(spath, "Diporder2Water.json"),
)
request_analysis["Diporder2Water.json"] = cos2_water
# Dielectric profiles
if "DielectricTotal" in set_maicos_files:
diel_total = DielectricPlanar(
u.atoms, **base_options,
output_prefix=os.path.join(spath, "DielectricTotal")
)
request_analysis["DielectricTotal"] = diel_total
if "DielectricWater" in set_maicos_files:
diel_water = DielectricPlanar(
water, **base_options,
output_prefix=os.path.join(spath, "DielectricWater")
)
request_analysis["DielectricWater"] = diel_water
if "DielectricLipid" in set_maicos_files:
diel_lipid = DielectricPlanar(
lipid, **base_options,
output_prefix=os.path.join(spath, "DielectricLipid")
)
request_analysis["DielectricLipid"] = diel_lipid
logger.info("We are ready to recompute " + "|".join(
request_analysis.keys()))
if ({"DielectricLipid", "DielectricWater", "DielectricTotal"}
.intersection(request_analysis.keys())):
logger.info("Check if dielectric profiles can be calculated "
"(not possible for charged systems)")
try:
diel_total.run(stop=1)
except (ValueError, UserWarning) as e:
print(f"Dielectric profiles not available for this system: {e}")
# create stub json-s to avoid recompute tries
for dfile in ["DielectricTotal", "DielectricWater", "DielectricLipid"]:
with open(os.path.join(
system_path, dfile + '_per.json'), 'w') as f:
f.write('{}')
with open(os.path.join(
system_path, dfile + '_par.json'), 'w') as f:
f.write('{}')
logger.info(f"Created empty dielectric profile JSONs for {dfile}.")
try:
del request_analysis["DielectricTotal"]
del request_analysis["DielectricWater"]
del request_analysis["DielectricLipid"]
except KeyError:
pass
# allowed
if request_analysis:
collection = AnalysisCollection(*request_analysis.values())
collection.run()
for _, analysis in request_analysis.items():
analysis.save()
# finall catch
except Exception as e:
logger.error('Calculation failed for ' + system['path'])
logger.error(str(e))
logger.error(traceback.format_exc())
return RCODE_ERROR
else:
return RCODE_COMPUTED