#!/usr/bin/env python3
r"""
Add new simulation into the NMRlipids Databank.
The script adds a simulation into the Databank based on ``info.yaml`` file.
**Usage:**
nml_add_simulation [-h] [-f FILE] [-d] [-n] [-w WORK_DIR] [--dry-run]
-h, --help Show this help message and exit
-f FILE, --file=FILE Input config file in yaml format.
-d, --debug Enable debug logging output
-n, --no-cache Always redownload repository files
-w WORK_DIR, --work-dir=WORK_DIR Set custom temporary working directory \
[not set = read from YAML]
**Returns** error codes:
- 0 - success
- 1 - input YAML parsing errors
- 2 - filesystem writting errors
- 3 - network accessing errors
"""
import argparse
import datetime
import importlib
import logging
import os
import pprint
import shutil
import subprocess
import sys
import tempfile
from copy import deepcopy
from urllib.error import HTTPError, URLError
import numpy as np
import pandas as pd
import yaml
from MDAnalysis import Universe
# import databank dictionaries
import DatabankLib
from DatabankLib.core import System, initialize_databank
# helpers
from DatabankLib.databankio import (
calc_file_sha1_hash,
create_databank_directories,
download_resource_from_uri,
resolve_download_file_url,
)
from DatabankLib.databankLibrary import lipids_set, molecules_set, parse_valid_config_settings
from DatabankLib.SchemaValidation.ValidateYAML import validate_info_dict
from DatabankLib.settings.engines import get_struc_top_traj_fnames, software_dict
from DatabankLib.settings.molecules import Lipid, NonLipid
pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", 500)
pd.set_option("display.width", 1000)
pd.set_option("display.max_colwidth", 1000)
[docs]
def add_simulation():
info_template_path = os.path.join(
importlib.util.find_spec("DatabankLib").submodule_search_locations[0],
"SchemaValidation",
"Schema",
"info_template.yaml",
)
# parse input yaml file
parser = argparse.ArgumentParser(
prog="Add Simulation Script",
description=f"""
Add new simulation into the NMRlipids Databank. The script adds a
simulation into the Databank based on ``info.yaml`` file. You can
get template info-file from here:
{info_template_path}
Returns error codes:
- 0 - success
- 1 - input YAML parsing errors
- 2 - filesystem writting errors
- 3 - network accessing errors
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-f",
"--file",
help="Input config file in yaml format.",
)
parser.add_argument(
"-d",
"--debug",
help="enable debug logging output",
action="store_true",
)
parser.add_argument(
"-n",
"--no-cache",
help="always redownload repository files",
action="store_true",
)
parser.add_argument(
"-w",
"--work-dir",
help="set custom temporary working directory [not set = /tmp]",
default=tempfile.gettempdir(),
)
parser.add_argument(
"--dry-run",
help="perform a dry-run download of the files with 50MB limit",
action="store_true",
)
args = parser.parse_args()
# configure logging
logging_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(
format="%(asctime)s [%(levelname)s]: %(message)s",
datefmt="%I:%M:%S %p",
level=logging_level,
)
logger = logging.getLogger()
input_path = os.path.normpath(args.file)
# load input yaml file into empty dictionary
info_yaml = {}
# open input file for reading and writing
with open(input_path) as yaml_file:
info_yaml = yaml.load(
yaml_file,
Loader=yaml.FullLoader, # noqa: S506
) # TODO may throw yaml.YAMLError
errors = validate_info_dict(info_yaml)
if errors:
for error in errors:
logger.exception(error)
sys.exit(1)
# Show the input read
logger.debug(f"{os.linesep} Input read from {input_path} file:")
pp = pprint.PrettyPrinter(width=41, compact=True)
if logger.isEnabledFor(logging.DEBUG):
pp.pprint(yaml.dump(info_yaml))
# validate yaml entries and return updated sim dict
try:
sim_dict, files = parse_valid_config_settings(info_yaml)
except KeyError:
logger.exception("Missing entry key in yaml config, aborting..")
sys.exit(1)
except Exception as e:
logger.exception(
f"an '{type(e).__name__}' occured while performing validity check '{input_path}', script has been aborted",
)
sys.exit(1)
else:
logger.info(
"all entries in simulation are understood and will be further processed",
)
logger.debug("valid sim entry keys:")
pp = pprint.PrettyPrinter(width=41, compact=True)
if logger.isEnabledFor(logging.DEBUG):
pp.pprint(sim_dict)
try:
sim = System(sim_dict)
# mapping files are registered here!
except Exception as e:
logger.exception(
f"an '{type(e).__name__}' occured while processing dict->System '{input_path}', script has been aborted",
)
sys.exit(1)
else:
logger.info(f"System object is successfully created from '{input_path}' file")
# Create temporary directory where to download files and analyze them
dir_wrk = args.work_dir
try:
if args.no_cache:
dir_tmp = tempfile.mkdtemp(prefix="tmp_6-", dir=dir_wrk)
else:
dir_tmp = os.path.join(dir_wrk, f"{sim['DOI'].split('/')[-1]}_download")
os.makedirs(dir_tmp, exist_ok=True)
except OSError:
logger.exception("Couldn't create temporary working directory '%s'.", dir_tmp)
sys.exit(2)
logger.info(f"The data will be processed in directory path '{dir_tmp}'")
# Check link status and download files
try:
download_links = []
for fi in files:
logger.info(f"Validating URL to file: {fi}..")
_x = resolve_download_file_url(sim["DOI"], fi, validate_uri=True)
download_links.append(_x)
logger.info(f"Now downloading {len(files)} files ...")
for url, fi in zip(download_links, files, strict=False):
download_resource_from_uri(
url,
os.path.join(dir_tmp, fi),
override_if_exists=args.no_cache,
dry_run_mode=args.dry_run,
)
logger.info(f"Download of {len(files)} files was successful")
except HTTPError as e:
if e.code == 404:
logger.exception(
f"ressource not found on server '{e.url}' (404). Wrong DOI link or file name?",
)
else:
logger.exception(f"HTTPError {e.code} while trying to download the file '{e.url}'")
sys.exit(3)
except URLError as e:
logger.exception(
f"couldn't resolve network adress: {e.reason}. Please check your internet connection.",
)
sys.exit(3)
except Exception as e:
logger.exception(
f"'{type(e).__name__}' while attempting to download ressources, aborting",
)
sys.exit(3)
# -- Calculate hash of downloaded files
sim_hashes = deepcopy(sim)
software_sim = software_dict[sim["SOFTWARE"].upper()]
# list_containing the sha1 sums for all required files
sha1_list_requied = []
# Make empty dataframe with the desired columns
df_files = pd.DataFrame(
columns=["NAME", "TYPE", "REQUIRED", "SIZE_MB", "HASH"],
dtype=object,
)
for key_sim, value_sim in sim_hashes.items():
# double-checking keys
try:
entry_type = software_sim[key_sim]["TYPE"]
except KeyError:
if key_sim in ["SOFTWARE", "ID"]:
continue
else:
# That shouldn't happen! Unexpected YAML-keys were checked by
# parse_valid_config_settings before
raise
if "file" in entry_type:
files_list = []
is_required = software_dict[sim_hashes["SOFTWARE"].upper()][key_sim]["REQUIRED"]
if not is_required and value_sim is None:
continue # skip not required NoneType (empty) file entries
for file_provided in value_sim:
file_name = os.path.join(dir_tmp, file_provided[0])
logger.info(f"calculating sha1 hash of '{file_provided[0]}'...")
file_hash = calc_file_sha1_hash(file_name)
file_size_mb = f"{(os.path.getsize(file_name) / 1024 / 1024):.2f}"
df_files = pd.concat(
[
df_files,
pd.DataFrame(
[
{
"NAME": file_provided[0],
"TYPE": key_sim,
"REQUIRED": is_required,
"SIZE_MB": file_size_mb,
"HASH": file_hash,
},
],
),
],
ignore_index=True,
)
files_list.append([file_provided[0], file_hash])
# Find the keys of the required files to calculate the master_hash
if is_required:
sha1_list_requied.append(file_hash)
sim_hashes[key_sim] = files_list # TODO Problematic
logger.info(f"Summary of downloaded files: {os.linesep}")
logger.info(df_files.to_string())
# Calculate the hash of a file contaning the hashes of each of the required files
# This should be always invariant as it will be used unique identifier for a
# simualtion. Note order the hashes of the required files before calculating the
# hash (That means that the required files cannot change)
# Calculates numbers of lipid molecules in each leaflet. This is done by checking
# on which side of the centre of mass the membrane each the centre of mass of a
# lipid molecule is. If a lipid molecule is split so that headgroup and tails are
# their own residues, the centre of mass of the headgroup is used in the
# calculation.
####################################################################################
logger.info(
"Calculating the numbers of lipid molecules in each leaflet based on the "
"center of mass of the membrane and lipids.",
)
logger.info(
"If a lipid molecule is split to multiple residues, the centre of mass of the headgroup is used.",
)
try:
struc, top, traj = get_struc_top_traj_fnames(sim, join_path=dir_tmp)
except (ValueError, KeyError) as _:
logger.exception("Some of fields required for Universe forming were not found.")
sys.exit(1)
except Exception as _:
logger.exception("Unkonwn error during `get_3major_fnames..`")
sys.exit(4)
leaflet1 = 0 # total number of lipids in upper leaflet
leaflet2 = 0 # total number of lipids in lower leaflet
# try to generate zero-frame structure from top + trajectory
fail_from_top = False
gro = os.path.join(dir_tmp, "frame0.gro") # structure regenerated from top
try:
logger.info(f"MDAnalysis tries to use {top} and {traj}")
u = Universe(top, traj)
u.atoms.write(gro, frames=u.trajectory[[0]])
except Exception as e:
logger.warning("%s: %s", e.__class__.__name__, e)
fail_from_top = True
# if previous fails then try the same from struc + trajectory
if fail_from_top and struc is not None:
try:
logger.info(f"MDAnalysis tries to use {struc} and {traj}")
u = Universe(struc, traj)
u.atoms.write(gro, frames=u.trajectory[[0]])
except Exception as _:
logger.exception("Cannot initialize MDAnalysis using given structure file!")
sys.exit(2)
# if there is no struc and MDAnalysis doesn't start from TOP, then
# GROMACS can try making struc from top!
if fail_from_top and struc is None and sim["SOFTWARE"].upper() == "GROMACS":
logger.info(
"Now generating frame0.gro with Gromacs because MDAnalysis cannot read tpr version ...",
)
if "WARNINGS" in sim and sim["WARNINGS"] is not None and sim["WARNINGS"]["GROMACS_VERSION"] == "gromacs3":
command = ["trjconv", "-s", top, "-f", traj, "-dump", "0", "-o", gro]
else:
command = ["gmx", "trjconv", "-s", top, "-f", traj, "-dump", "0", "-o", gro]
logger.debug(f"executing 'echo System | {' '.join(command)}'")
try:
subprocess.run(command, input="System\n", text=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
FAIL_MSG = f"Command 'echo System | {' '.join(command)}' failed with error: {e.stderr}"
raise RuntimeError(FAIL_MSG) from e
try:
u = Universe(gro, traj)
# write first frame into gro file
u.atoms.write(gro, frames=u.trajectory[[0]])
except Exception as e:
logger.warning("%s: %s", e.__class__.__name__, e)
struc = gro
# if there is a topology and MDAnalysis reads it, we can use zero-frame
# extraction as a structure!
if not fail_from_top and struc is None:
struc = gro
# At last, we create universe from just a structure!
logger.info(f"Making Universe from {struc}..")
u0 = Universe(struc)
lipids = []
# select lipids
for key_mol in lipids_set.names:
logger.info(f"Calculating number of '{key_mol}' lipids")
selection = ""
if key_mol in sim["COMPOSITION"]:
lip = Lipid(key_mol)
m_file = sim["COMPOSITION"][key_mol]["MAPPING"]
lip.register_mapping(m_file)
for key in lip.mapping_dict:
if "RESIDUE" in lip.mapping_dict[key]:
selection = (
selection
+ "(resname "
+ lip.mapping_dict[key]["RESIDUE"]
+ " and name "
+ lip.mapping_dict[key]["ATOMNAME"]
+ ") or "
)
else:
selection = "resname " + sim["COMPOSITION"][key_mol]["NAME"]
break
selection = selection.removesuffix(" or ")
molecules = u0.select_atoms(selection)
if molecules.n_residues > 0:
lipids.append(u0.select_atoms(selection))
if not lipids:
raise RuntimeError("No lipids were found in the composition!")
# join all the selected the lipids together to make a selection of the entire
# membrane and calculate the z component of the centre of mass of
# the membrane
membrane = u0.select_atoms("")
R_membrane_z = 0
if lipids:
for i in range(len(lipids)):
membrane = membrane + lipids[i]
R_membrane_z = membrane.center_of_mass()[2]
logger.info(f"Center of the mass of the membrane: {R_membrane_z!s}")
# ---- number of each lipid per leaflet
# TODO: remove code duplication block!
for key_mol in lipids_set.names:
leaflet1 = 0
leaflet2 = 0
selection: str = ""
if key_mol in sim["COMPOSITION"]:
lip = Lipid(key_mol)
m_file = sim["COMPOSITION"][key_mol]["MAPPING"]
lip.register_mapping(m_file)
for key in lip.mapping_dict:
if "RESIDUE" in lip.mapping_dict[key]:
selection = (
selection
+ "resname "
+ lip.mapping_dict[key]["RESIDUE"]
+ " and name "
+ lip.mapping_dict[key]["ATOMNAME"]
+ " or "
)
break
else:
selection = "resname " + sim["COMPOSITION"][key_mol]["NAME"]
break
# if lipid was found then selection is not empty
if selection != "":
selection = selection.removesuffix(" or ")
logger.debug(f"Selection: `{selection}`")
molecules = u0.select_atoms(selection)
logger.debug(
"Resnames: %s | ResIDs: %s",
", ".join(molecules.residues.resnames),
", ".join(map(str, molecules.residues.resids)),
)
if molecules.n_residues > 0:
for mol in molecules.residues:
R = mol.atoms.center_of_mass()
if R[2] - R_membrane_z > 0:
leaflet1 = leaflet1 + 1
elif R[2] - R_membrane_z < 0:
leaflet2 = leaflet2 + 1
try:
sim["COMPOSITION"][key_mol]["COUNT"] = [leaflet1, leaflet2]
except KeyError:
continue
else:
logger.info(f"Number of '{key_mol}' in upper leaflet: {leaflet1!s}")
logger.info(f"Number of '{key_mol}' in lower leaflet: {leaflet2!s}")
# ----- numbers of other molecules
for key_mol in molecules_set.names:
try:
mol_name = sim["COMPOSITION"][key_mol]["NAME"]
except KeyError:
continue
else:
mol_number = u0.select_atoms("resname " + mol_name).n_residues
sim["COMPOSITION"][key_mol]["COUNT"] = mol_number
logger.info(
f"Number of '{key_mol}': {sim['COMPOSITION'][key_mol]['COUNT']!s}",
)
# Anne: Read trajectory size and length
sim["TRAJECTORY_SIZE"] = os.path.getsize(traj)
n_frames = len(u.trajectory)
timestep = u.trajectory.dt
logger.info(f"Number of frames: {n_frames}")
logger.info(f"Timestep: {timestep}")
trj_length = n_frames * timestep
sim["TRJLENGTH"] = trj_length
# Read temperature from tpr
if sim["SOFTWARE"].upper() == "GROMACS":
file1 = os.path.join(dir_tmp, "tpr.txt")
logger.info("Exporting information with gmx dump")
if (
"WARNINGS" in sim
and sim["WARNINGS"] is not None
and "GROMACS_VERSION" in sim["WARNINGS"]
and sim["WARNINGS"]["GROMACS_VERSION"] == "gromacs3"
):
command = ["gmxdump", "-s", top]
TemperatureKey = "ref_t"
else:
command = ["gmx", "dump", "-s", top]
TemperatureKey = "ref-t"
try:
result = subprocess.run(command, input="System\n", text=True, check=True, capture_output=True)
with open(file1, "w") as f:
f.write(result.stdout)
except subprocess.CalledProcessError as e:
FAIL_MSG = f"Command 'echo System | {' '.join(command)}' failed with error: {e.stderr}"
raise RuntimeError(FAIL_MSG) from e
with open(file1) as tpr_info:
for line in tpr_info:
if TemperatureKey in line:
sim["TEMPERATURE"] = float(line.split()[1])
logger.info("Parameters read from input files:")
logger.info(f"TEMPERATURE: {sim['TEMPERATURE']!s}")
logger.info(f"LENGTH OF THE TRAJECTORY: {sim['TRJLENGTH']!s}")
# Check that the number of atoms between data and README.yaml match
natoms_trj = u.atoms.n_atoms
number_of_atoms = 0
for key_mol in sim["COMPOSITION"]:
mol = Lipid(key_mol) if key_mol in lipids_set else NonLipid(key_mol)
mol.register_mapping(sim["COMPOSITION"][key_mol]["MAPPING"])
if sim.get("UNITEDATOM_DICT") and "SOL" not in key_mol:
mapping_file_length = 0
for key in mol.mapping_dict:
if "H" in key:
continue
else:
mapping_file_length += 1
else:
mapping_file_length = len(mol.mapping_dict)
number_of_atoms += np.sum(sim["COMPOSITION"][key_mol]["COUNT"]) * mapping_file_length
if number_of_atoms != natoms_trj:
stop = input(
f"Number of atoms in trajectory {natoms_trj} and README.yaml "
f"{number_of_atoms} do no match. Check the mapping files and molecule"
f" names. {os.linesep} If you know what you are doing, you can still "
"continue the running the script. Do you want to (y/n)?",
)
if stop == "n":
os._exit("Interrupted because atomnumbers did not match")
if stop == "y":
logger.warning(
"Progressed even thought that atom numbers did not match. CHECK RESULTS MANUALLY!",
)
sim["NUMBER_OF_ATOMS"] = natoms_trj
logger.info(f"Number of atoms in the system: {sim['NUMBER_OF_ATOMS']!s}")
# ---- DATE OF RUNNING ----
today = datetime.datetime.now().date().strftime("%d/%m/%Y") # noqa: DTZ005
sim["DATEOFRUNNING"] = today
logger.info(f"Date of adding to the databank: {sim['DATEOFRUNNING']}")
# Type of system is currently hard coded because only lipid bilayers are currently
# added. When we go for other systems, this will be given by user.
if "TYPEOFSYSTEM" not in list(sim.keys()):
sim["TYPEOFSYSTEM"] = "lipid bilayer"
# Determine inserting ID. We set -1 -2 -3 .. to new systems
ss = initialize_databank()
id_list = [s["ID"] for s in ss] + [0]
sim["ID"] = min(id_list) - 1
logger.info("Inserting ID: %s", str(sim["ID"]))
del ss
# dictionary saved in yaml format
outfile_dict = os.path.join(dir_tmp, "README.yaml")
with open(outfile_dict, "w") as f:
yaml.dump(sim.readme, f, sort_keys=False, allow_unicode=True)
logger.info(f"Databank README was saved to '{outfile_dict}'")
# Try to create final directory
try:
directory_path = create_databank_directories(
sim,
sim_hashes,
DatabankLib.NMLDB_SIMU_PATH,
dry_run_mode=args.dry_run,
)
except NotImplementedError:
logger.exception("[deprecated] Special error during directory creation (not implemented)")
sys.exit(4)
except OSError as e:
logger.exception("couldn't create output directory: %s", e.args[1])
sys.exit(2)
logger.info("Databank entry will be registered into '%s'", directory_path)
# copy previously downloaded files
if not args.dry_run:
logger.info("Copying files to the output directory [try hardlink for the traj.]...")
try:
os.link(
traj,
os.path.join(directory_path, os.path.basename(traj)),
)
except OSError:
logger.warning(
f"Could not hardlink trajectory file '{traj}' to the output directory. Copying instead.",
)
shutil.copyfile(traj, os.path.join(directory_path, os.path.basename(traj)))
shutil.copyfile(
top,
os.path.join(directory_path, os.path.basename(top)),
)
shutil.copyfile(
os.path.join(dir_tmp, "README.yaml"),
os.path.join(directory_path, "README.yaml"),
)
logger.info("Script completed successfully!")
if __name__ == "__main__":
add_simulation()