Source code for Scripts.DatabankLib.analyze

"""
:module: DatabankLib.analyze
:description: major analysis methods calculating a property from NMRLipipds
              `system` object
"""

import json
import os
import sys
import re
import traceback
from logging import Logger
import buildh
import urllib.request
import socket

from tqdm import tqdm
import numpy as np
import gc

from DatabankLib import (
    NMLDB_SIMU_PATH,
    RCODE_ERROR,
    RCODE_SKIPPED,
    RCODE_COMPUTED,
    NMLDB_DATA_PATH,
)
from DatabankLib.core import System
from DatabankLib.settings.molecules import lipids_set
from DatabankLib.settings.engines import get_struc_top_traj_fnames
from DatabankLib.databankLibrary import GetNlipids, system2MDanalysisUniverse
from DatabankLib.jsonEncoders import CompactJSONEncoder
from DatabankLib.databankio import resolve_download_file_url
from DatabankLib.databankop import find_OP
from DatabankLib.form_factor import FormFactor
from DatabankLib import analyze_nmrpca as nmrpca


[docs] def computeNMRPCA( # noqa: N802 (API) system: System, logger: Logger, recompute: bool = False ) -> int: """Compute eq_times.json using NMR PCA analysis. Args: system (System): one of systems of the Databank recompute (bool, optional): Delete previous apl.json and recompute it if True. Defaults to False. Returns: int success code (RCODE_...) """ print("== Doi: " + system["DOI"]) print("== System name: " + system["SYSTEM"]) # getting data from databank and preprocessing them # Start Parser # TODO: 2test| parser = Parser(NMLDB_SIMU_PATH, readme, eq_time_fname, testTraj) parser = nmrpca.Parser(system, "eq_times.json") # Check trajectory print(parser._path) vpcode = parser.validate_path() print("ValidatePath code: ", vpcode) if vpcode > 0: sys.stderr.write("Some errors in analyze_nmrpca.py::Parser constructor.\n") return RCODE_ERROR elif vpcode < 0 and not recompute: return RCODE_SKIPPED if ( "WARNINGS" in system and system["WARNINGS"] is not None and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"] ): return RCODE_SKIPPED if ( "WARNINGS" in system and system["WARNINGS"] is not None and "SKIP_EQTIMES" in system["WARNINGS"] ): return RCODE_SKIPPED # Check if TPR is defined and non-null or "" try: __ = system["TPR"][0][0] _ = __[0] except (KeyError, TypeError): sys.stderr.write("TPR is required for NMRPCA analysis!") return RCODE_ERROR # Download files parser.download_traj() # Prepare trajectory parser.prepare_traj() # Concatenate trajectory parser.concatenate_traj() equilibration_times = {} # Iterate over trajectories for different lipids for lip, traj in parser.concatenated_trajs.items(): print(f"Main: parsing lipid {lip}") # Creat PCA for trajectory pca_runner = nmrpca.PCA(traj[0], traj[1], traj[2], traj[3], parser.trj_len) # Run PCA data = pca_runner.PCA() print("Main: PCA: done") # Project trajectory on PC1 pca_runner.get_proj(data) print("Main: Projections: done") # Calculate autocorrelations pca_runner.get_autocorrelations() print("Main: Autocorrelations: done") # Estimate equilibration time te2 = nmrpca.TimeEstimator(pca_runner.autocorrelation).calculate_time() equilibration_times[lip] = te2 / parser.trj_len print("Main: EQ time: done") print(te2 / parser.trj_len) parser.dump_data(equilibration_times) gc.collect() return RCODE_COMPUTED
[docs] def computeAPL( # noqa: N802 (API) system: System, logger: Logger, recompute: bool = False ) -> int: """Generate apl.json analysis file for a system. Args: system (dict): one of systems of the Databank recompute (bool, optional): Delete previous apl.json and recompute it if True. Defaults to False. Returns: int success code (RCODE_...) """ # TODO: reading software and file path for simulations # software = system['SOFTWARE'] path = system["path"] # this is checking if area per lipid is already calculated for the systems outfilename = os.path.join(NMLDB_SIMU_PATH, path, "apl.json") if os.path.isfile(outfilename): if recompute: os.unlink(outfilename) else: return RCODE_SKIPPED print("Analyzing: ", path) print("Will write into: ", outfilename) # calculates the total number of lipids n_lipid = GetNlipids(system) # makes MDAnalysis universe from the system. This also downloads the data if not # yet locally available u = system2MDanalysisUniverse(system) if u is None: print("Generation of MDAnalysis universe failed in folder", path) return RCODE_ERROR # this calculates the area per lipid as a function of time and stores it # in the databank apl = {} for ts in tqdm(u.trajectory, desc="Scanning the trajectory"): if u.trajectory.time >= system["TIMELEFTOUT"] * 1000: dims = u.dimensions apl_frame = dims[0] * dims[1] * 2 / n_lipid apl[u.trajectory.time] = apl_frame with open(outfilename, "w") as f: json.dump(apl, f, cls=CompactJSONEncoder) return RCODE_COMPUTED
[docs] def computeTH( # noqa: N802 (API) system: dict, logger: Logger, recompute: bool = False ) -> int: thick_fn = os.path.join(NMLDB_SIMU_PATH, system["path"], "thickness.json") if os.path.isfile(thick_fn) and not recompute: # file exist. Skipping return RCODE_SKIPPED wat_dens_name = os.path.join(NMLDB_SIMU_PATH, system["path"], "WaterDensity.json") lip_dens_name = os.path.join(NMLDB_SIMU_PATH, system["path"], "LipidDensity.json") print(lip_dens_name) try: with open(wat_dens_name) as f: wat_dens = json.load(f) with open(lip_dens_name) as f: lip_dens = json.load(f) wd = np.array(wat_dens) ld = np.array(lip_dens) idx = np.argwhere(np.diff(np.sign(wd[:, 1] - ld[:, 1]))).flatten() if len(idx) < 2: print( "Dehydrated sample! Standard thickness rule doesn't work." " Will extract boxZ." ) thickness = wd[-1, 0] - wd[0, 0] else: thickness = wd[idx[1], 0] - wd[idx[0], 0] with open(thick_fn, "w") as f: json.dump(thickness, f) except Exception as e: print("Calculation failed for " + system["path"]) print(str(e)) print(traceback.format_exc()) return RCODE_ERROR return RCODE_COMPUTED
# TODO: implement onlyLipid
[docs] def computeOP( # noqa: N802 (API) system: System, logger: Logger, recompute: bool = False ) -> int: """_summary_ Args: system (dict): _description_ recompute (bool, optional): _description_. Defaults to False. Returns: int: _description_ """ path = system["path"] # Check if order parameters are calculated or something in the system prevents # order parameter calculation for key in system["COMPOSITION"]: outfilename = os.path.join(NMLDB_SIMU_PATH, path, key + "OrderParameters.json") # print(outfilename) if os.path.isfile(outfilename) or ( "WARNINGS" in system.keys() and type(system["WARNINGS"]) is dict and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"].keys() and key in system["WARNINGS"]["AMBIGUOUS_ATOMNAMES"] ): file_found = True elif key in lipids_set: file_found = False break if file_found and not recompute: return RCODE_SKIPPED # If order parameters are not calculated and system ok, then continue to # calculating order parameters print("Analyzing: ", path) # Download the simulations and create MDAnalysis universe (the universe is not used # here but this script downloads the data) _ = system2MDanalysisUniverse(system) # Software and time for equilibration period software = system["SOFTWARE"] eq_time = float(system["TIMELEFTOUT"]) * 1000 # Check if all or united atom simulation try: united_atom = system["UNITEDATOM_DICT"] except KeyError: united_atom = False # Check relevant warnings g3switch = ( "WARNINGS" in system and type(system["WARNINGS"]) is dict and "GROMACS_VERSION" in system["WARNINGS"] and system["WARNINGS"]["GROMACS_VERSION"] == "gromacs3" ) trconv_command = "trjconv" if g3switch else "gmx trjconv" cur_path = os.path.join(NMLDB_SIMU_PATH, path) try: struc_fname, top_fname, trj_fname = get_struc_top_traj_fnames( system, join_path=cur_path ) except (ValueError, KeyError) as e: sys.stderr.write("Error reading filenames from system dictionary.\n") sys.stderr.write(str(type(e)) + " => " + str(e)) return RCODE_ERROR try: # Set topology file names and make Gromacs trajectories whole if "gromacs" in software: if top_fname is None: raise ValueError("TPR is required for OP calculations!") xtcwhole = os.path.join(NMLDB_SIMU_PATH, path, "whole.xtc") if not os.path.isfile(xtcwhole): exec_str = ( f"echo System | {trconv_command} -f {trj_fname} " f"-s {top_fname} -o {xtcwhole} -pbc mol -b {str(eq_time)}" ) print("Make molecules whole in the trajectory") if united_atom and system["TRAJECTORY_SIZE"] > 15e9: print( "United atom trajectry larger than 15 Gb. " "Using only every third frame to reduce memory usage." ) exec_str += " -skip 3" rcode = os.system(exec_str) if rcode != 0: raise RuntimeError("trjconv exited with error (see above)") elif "openMM" in software or "NAMD" in software: if not os.path.isfile(struc_fname): pdb_url = resolve_download_file_url(system.get("DOI"), struc_fname) _ = urllib.request.urlretrieve(pdb_url, struc_fname) else: print( "Order parameter calculation for other than gromacs, " "openMM and NAMD are yet to be implemented." ) return RCODE_ERROR # Calculate order parameters # ----------------------------------- # united-atom switch -> engine switch if united_atom: if "gromacs" not in software: raise ValueError("UNITED_ATOMS is supported only for GROMACS engine!") frame0struc = os.path.join(NMLDB_SIMU_PATH, path, "frame0.gro") if g3switch: rcode = os.system( f"echo System | editconf -f {top_fname} -o {frame0struc}" ) if rcode != 0: raise RuntimeError("editconf exited with error (see above)") else: rcode = os.system( f"echo System | {trconv_command} -f {xtcwhole}" f" -s {top_fname} -dump 0 -o {frame0struc}" ) if rcode != 0: raise RuntimeError( f"trjconv ({trconv_command}) exited with error (see above)" ) for key in system["UNITEDATOM_DICT"]: # construct order parameter definition file for CH bonds from # mapping file mapping_dict = system.content[key].mapping_dict def_fname = os.path.join(NMLDB_SIMU_PATH, path, key + ".def") def_file = open(def_fname, "w") previous_line = "" regexp1_H = re.compile(r"M_[A-Z0-9]*C[0-9]*H[0-9]*_M") # noqa: N806 regexp2_H = re.compile(r"M_G[0-9]*H[0-9]*_M") # noqa: N806 regexp1_C = re.compile(r"M_[A-Z0-9]*C[0-9]*_M") # noqa: N806 regexp2_C = re.compile(r"M_G[0-9]_M") # noqa: N806 # Note that mapping_dict's keys must be ordered # C11 H111 H112 C12 H121 H122 ... # otherwize algorithm will fail for mapping_key in mapping_dict: if regexp1_C.search(mapping_key) or regexp2_C.search(mapping_key): atom_c = [mapping_key, mapping_dict[mapping_key]["ATOMNAME"]] atom_h = [] elif regexp1_H.search(mapping_key) or regexp2_H.search(mapping_key): atom_h = [mapping_key, mapping_dict[mapping_key]["ATOMNAME"]] else: atom_c = [] atom_h = [] if atom_h: assert atom_c items = [atom_c[1], atom_h[1], atom_c[0], atom_h[0]] def_line = ( items[2] + "&" + items[3] + " " + key + " " + items[0] + " " + items[1] + "\n" ) if def_line != previous_line: def_file.write(def_line) previous_line = def_line def_file.close() # Add hydrogens to trajectory and calculate order parameters with buildH op_file = os.path.join( NMLDB_SIMU_PATH, path, key + "OrderParameters.dat" ) lipid_json_file = [ os.path.join( NMLDB_DATA_PATH, "lipid_json_buildH", system["UNITEDATOM_DICT"][key] + ".json", ) ] if not os.path.isfile(lipid_json_file[0]): lipid_json_file = None print(system["UNITEDATOM_DICT"][key]) buildh.launch( coord_file=frame0struc, def_file=def_fname, lipid_type=system["UNITEDATOM_DICT"][key], lipid_jsons=lipid_json_file, traj_file=xtcwhole, out_file=f"{op_file}.buildH", ignore_CH3s=True, ) outfile = open(op_file, "w") outfile.write("Atom Average OP OP stem\n") data = {} outfile2 = os.path.join( NMLDB_SIMU_PATH, path, key + "OrderParameters.json" ) with open(op_file + ".buildH") as op_file: lines = op_file.readlines() for line in lines: if "#" in line: continue line2 = ( line.split()[0].replace("&", " ") + " " + line.split()[4] + " " + line.split()[5] + " " + line.split()[6] + "\n" ) outfile.write(line2) op_name = line.split()[0].replace("&", " ") # -- line.split()[0] + " " + line.split()[1] op_values = [ float(line.split()[4]), float(line.split()[5]), float(line.split()[6]), ] data[str(op_name)] = [] data[str(op_name)].append(op_values) with open(outfile2, "w") as f: json.dump(data, f, cls=CompactJSONEncoder) outfile.close() # not united-atom cases else: if "gromacs" in software: gro = os.path.join(NMLDB_SIMU_PATH, path, "conf.gro") print("\n Making gro file") if g3switch: rcode = os.system(f"echo System | editconf -f {top_fname} -o {gro}") if rcode != 0: raise RuntimeError("editconf exited with error (see above)") else: rcode = os.system( f"echo System | gmx trjconv " f"-f {trj_fname} -s {top_fname} -dump 0 -o {gro}" ) if rcode != 0: raise RuntimeError("trjconv exited with error (see above)") for key in system["COMPOSITION"]: if ( "WARNINGS" in system.keys() and system["WARNINGS"] is not None and "AMBIGUOUS_ATOMNAMES" in system["WARNINGS"].keys() and key in system["WARNINGS"]["AMBIGUOUS_ATOMNAMES"] ): print(path, key) print( "Order parameters cannot be calculated if atom " "names are ambiguous." ) continue if key in lipids_set: print("Calculating ", key, " order parameters") resname = system["COMPOSITION"][key]["NAME"] outfilename = os.path.join( NMLDB_SIMU_PATH, path, key + "OrderParameters.dat" ) outfilename2 = os.path.join( NMLDB_SIMU_PATH, path, key + "OrderParameters.json" ) if os.path.isfile(outfilename2): print("Order parameter file already found") continue if "gromacs" in software: try: op_obj = find_OP( system.content[key].mapping_dict, top_fname, xtcwhole, resname, ) except Exception as e: logger.warning(f"We got this exception: \n {e}") logger.warning( "But we will try rebuild the Universe " "from GROM if using tpr did not work!" ) op_obj = find_OP( system.content[key].mapping_dict, gro, xtcwhole, resname ) if "openMM" in software or "NAMD" in software: op_obj = find_OP( system.content[key].mapping_dict, struc_fname, trj_fname, resname, ) data = {} with open(outfilename, "w") as outfile: outfile.write("Atom Average OP OP stem\n") for i, op in enumerate(op_obj): (op.avg, op.std, op.stem) = op.get_avg_std_stem_OP outfile.write(f"{op.name} {str(op.avg)} {str(op.stem)}\n") data[str(op.name)] = [] data[str(op.name)].append(op.get_avg_std_stem_OP) with open(outfilename2, "w") as f: json.dump(data, f, cls=CompactJSONEncoder) print("Order parameters calculated and saved to ", path) except Exception as e: print("Calculation failed for " + system["path"]) print(str(e)) print(traceback.format_exc()) return RCODE_ERROR return RCODE_COMPUTED
[docs] def computeFF( # noqa: N802 (API) system: System, logger: Logger, recompute: bool = False ) -> int: logger.info("System title: " + system["SYSTEM"]) logger.info("System path: " + system["path"]) software = system["SOFTWARE"] # download trajectory and gro files system_path = os.path.join(NMLDB_SIMU_PATH, system["path"]) doi = system.get("DOI") skip_downloading: bool = doi == "localhost" if skip_downloading: print("NOTE: The system with 'localhost' DOI should be downloaded by the user.") if os.path.isfile(system_path + os.sep + "FormFactor.json") and not recompute: return RCODE_SKIPPED if system["TYPEOFSYSTEM"] == "miscellaneous": return RCODE_SKIPPED print("Analyzing", system_path) try: if system["UNITEDATOM_DICT"]: print("United atom simulation") except KeyError: pass try: if system["WARNINGS"]["ORIENTATION"]: print( "Skipping due to ORIENTATION warning:", system["WARNINGS"]["ORIENTATION"], ) return RCODE_SKIPPED except (KeyError, TypeError): pass try: if system["WARNINGS"]["PBC"] == "hexagonal-box": print("Skipping due to PBC warning:", system["WARNINGS"]["PBC"]) return RCODE_SKIPPED except (KeyError, TypeError): pass try: if system["WARNINGS"]["NOWATER"]: print("Skipping because there is not water in the trajectory.") return RCODE_SKIPPED except (KeyError, TypeError): pass output_name = "" try: struc, top, trj = get_struc_top_traj_fnames(system) trj_name = os.path.join(system_path, trj) if struc is None: struc_name = None else: struc_name = os.path.join(system_path, struc) if top is None: tpr_name = None else: top_name = os.path.join(system_path, top) except Exception as e: logger.error("Error getting structure/topology/trajectory filenames.") logger.error(str(e)) return RCODE_ERROR socket.setdefaulttimeout(15) try: if skip_downloading: if not os.path.isfile(trj_name): raise FileNotFoundError( f"Trajectory should be downloaded [{trj_name}] by user" ) else: trj_url = resolve_download_file_url(system["DOI"], trj) if not os.path.isfile(trj_name): print( "Downloading trajectory with the size of ", system["TRAJECTORY_SIZE"], " to ", system["path"], ) _ = urllib.request.urlretrieve(trj_url, trj_name) # make a function like this # TODO TPR should not be obligatory for GROMACS if "gromacs" in software: tpr_name = top_name if skip_downloading: if not os.path.isfile(tpr_name): raise FileNotFoundError( f"TPR should be downloaded [{tpr_name}] by user" ) else: tpr_url = resolve_download_file_url(doi, top) if not os.path.isfile(tpr_name): _ = urllib.request.urlretrieve(tpr_url, tpr_name) if "openMM" in software or "NAMD" in software: if skip_downloading: if not os.path.isfile(struc_name): raise FileNotFoundError( f"Structure file should be downloaded [{struc_name}] by user" ) else: pdb_url = resolve_download_file_url(doi, struc) if not os.path.isfile(struc_name): _ = urllib.request.urlretrieve(pdb_url, struc_name) eq_time = float(system["TIMELEFTOUT"]) * 1000 # FIND LAST CARBON OF SN-1 TAIL AND G3 CARBON for molecule in system["COMPOSITION"]: if molecule in lipids_set: mapping = system.content[molecule].mapping_dict # TODO: rewrite via lipid dictionary! for nm in ["M_G3_M", "M_G13_M", "M_C32_M"]: try: g3_atom = mapping[nm]["ATOMNAME"] continue except (KeyError, TypeError): pass # TODO: rewrite via lipid dictionary if "M_G1C4_M" in mapping.keys(): for c_idx in range(4, 30): atom = "M_G1C" + str(c_idx) + "_M" try: last_atom = mapping[atom]["ATOMNAME"] except (KeyError, TypeError): continue elif "M_G11C4_M" in mapping.keys(): for c_idx in range(4, 30): atom = "M_G11C" + str(c_idx) + "_M" try: last_atom = mapping[atom]["ATOMNAME"] except (KeyError, TypeError): continue elif "M_CA4_M" in mapping.keys(): for c_idx in range(4, 30): atom = "M_CA" + str(c_idx) + "_M" try: last_atom = mapping[atom]["ATOMNAME"] except (KeyError, TypeError): continue print(last_atom, g3_atom) # Center around one lipid tail CH3 to guarantee all lipids in the same box if "gromacs" in system["SOFTWARE"]: if ( "WARNINGS" in system and system["WARNINGS"] is not None and "GROMACS_VERSION" in system["WARNINGS"] and system["WARNINGS"]["GROMACS_VERSION"] == "gromacs3" ): trjconv_cmd = "trjconv" makendx_cmd = "make_ndx" else: trjconv_cmd = "gmx trjconv" makendx_cmd = "gmx make_ndx" os.system("rm foo.ndx") os.system( f'echo "a {last_atom}\nq" | {makendx_cmd} -f {tpr_name} ' f"-o foo.ndx" ) os.system("tail -n1 foo.ndx | awk '{print $NF}'") os.system('echo "[ centralAtom ]" >> foo.ndx') os.system("tail -n2 foo.ndx | head -n1 | awk '{print $NF}' >> foo.ndx") xtcwhole = os.path.join(system_path, "whole.xtc") xtcfoo = os.path.join(system_path, "foo2.xtc") xtccentered = os.path.join(system_path, "centered.xtc") if not os.path.isfile(xtccentered): print("Make molecules whole in the trajectory") # if unitedAtom and system['TRAJECTORY_SIZE'] > 15000000000: # print("United atom trajectry larger than 15 Gb. " # "Using only every third frame to reduce memory usage.") # os.system('echo System | gmx trjconv -f ' + trj_name + ' -s ' + # tpr_name + ' -o ' + xtcwhole + ' -pbc mol -b ' + str(EQtime) + # ' -skip 3') # else: if not os.path.isfile(xtcwhole): os.system( f"echo System | {trjconv_cmd} -f {trj_name} " f"-s {tpr_name} -o {xtcwhole} -pbc mol -b {str(eq_time)}" ) if not os.path.isfile(xtcfoo): os.system( f'echo "centralAtom\nSystem" | {trjconv_cmd} -center' f" -pbc mol -n foo.ndx -f {xtcwhole} -s {tpr_name}" f" -o {xtcfoo}" ) os.system("rm foo.ndx") os.system("rm " + xtcwhole) # Center around the center of mass of all the g_3 carbons # if (not os.path.isfile(xtccentered)): os.system( f'echo "a {g3_atom}\nq" | {makendx_cmd}' f" -f {tpr_name} -o foo.ndx" ) os.system( f'echo "{g3_atom}\nSystem" | {trjconv_cmd} -center' f" -pbc mol -n foo.ndx -f {xtcfoo} -s {tpr_name}" f" -o {xtccentered}" ) os.system("rm " + xtcfoo) os.system("rm foo.ndx") else: print( "Centering for other than Gromacs may not work if there are" " jumps over periodic boundary conditions in z-direction." ) if not os.path.isfile(system_path + os.sep + "FormFactor.json"): try: if "gromacs" in system["SOFTWARE"]: FormFactor(tpr_name, xtccentered, 200, output_name, system) if "openMM" in system["SOFTWARE"] or "NAMD" in system["SOFTWARE"]: FormFactor(struc_name, trj_name, 200, output_name, system) except ValueError as e: # Here it was expected to have allow_pickle-type errors. # but I suppose, we cannot simply ignore them because it means that the # code breaks at this place, and the running user should be informed # about it! raise e # finall catch except Exception as e: print("Calculation failed for " + system["path"]) print(str(e)) print(traceback.format_exc()) return RCODE_ERROR else: return RCODE_COMPUTED