Source code for Scripts.DatabankLib.databankio

"""
:module databankio: Inut/Output auxilary module
:description:
    Input/Output auxilary module with some small usefull functions. It includes:
    - Network communication.
    - Downloading files.
    - Checking links.
    - Resolving DOIs.
    - Calculating file hashes.
"""

import logging
import math
import hashlib
import os
import time
import socket
from typing import Mapping
import urllib.error
from tqdm import tqdm
import urllib.request

logger = logging.getLogger(__name__)
MAX_DRYRUN_SIZE = 50 * 1024 * 1024  # 50 MB, max size for dry-run download


[docs] def download_resource_from_uri( uri: str, dest: str, override_if_exists: bool = False, dry_run_mode: bool = False ) -> int: """ Download file resource [from uri] to given file destination using urllib :param uri: (str) file URL :param dest: (str) file destination path :param override_if_exists: (bool, optional) Override dest. file if exists. Defaults to False. :raises HTTPException: An error occured during download :return: code (int) 0 - OK, 1 - skipped, 2 - redownloaded """ # TODO verify file size before skipping already existing download! class RetrieveProgressBar(tqdm): # uses tqdm.update(), see docs https://github.com/tqdm/tqdm#hooks-and-callbacks def update_retrieve(self, b=1, bsize=1, tsize=None): if tsize is not None: self.total = tsize return self.update(b * bsize - self.n) fi_name = uri.split("/")[-1] # check if dest path already exists if not override_if_exists and os.path.isfile(dest): socket.setdefaulttimeout(10) # seconds # compare filesize fi_size = urllib.request.urlopen(uri).length # download size if fi_size == os.path.getsize(dest): logger.info(f"{dest}: file already exists, skipping") return 1 else: logger.warning( f"{fi_name} filesize mismatch of local " f"file '{fi_name}', redownloading ..." ) return 2 # download socket.setdefaulttimeout(10) # seconds url_size = urllib.request.urlopen(uri).length # download size if dry_run_mode: # Download only up to MAX_DRYRUN_SIZE bytes, no tqdm with urllib.request.urlopen(uri) as response, open(dest, "wb") as out_file: total = min(url_size, MAX_DRYRUN_SIZE) if url_size else MAX_DRYRUN_SIZE downloaded = 0 chunk_size = 8192 next_report = 10 * 1024 * 1024 # print every 10 MB logger.info( "Dry-run: Downloading up to" f" {total // (1024*1024)} MB of {fi_name} ...") while downloaded < total: to_read = min(chunk_size, total - downloaded) chunk = response.read(to_read) if not chunk: break out_file.write(chunk) downloaded += len(chunk) if downloaded >= next_report: print(f" Downloaded {downloaded // (1024*1024)} MB ...") next_report += 10 * 1024 * 1024 logger.info( "Dry-run: Finished, downloaded" f" {downloaded // (1024*1024)} MB of {fi_name}") return 0 with RetrieveProgressBar( unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=fi_name ) as u: _ = urllib.request.urlretrieve(uri, dest, reporthook=u.update_retrieve) # check if the file is fully downloaded size = os.path.getsize(dest) if url_size != size: raise Exception(f"downloaded filsize mismatch ({size}/{url_size} B)") return 0
def resolve_doi_url(doi: str, validate_uri: bool = True) -> str: """ :meta private: Returns full doi link of given ressource, also checks if URL is valid. Args: doi (str): [doi] part from config validate_uri (bool, optional): Check if URL is valid. Defaults to True. Returns: str: full doi link """ res = "https://doi.org/" + doi if validate_uri: socket.setdefaulttimeout(10) # seconds _ = urllib.request.urlopen(res) return res def resolve_download_file_url( doi: str, fi_name: str, validate_uri: bool = True, sleep429=5) -> str: """ :meta private: Resolve file URI from supported DOI with given filename Args: doi (str): DOI string fi_name (str): name of the file to resolve from source validate_uri (bool, optional): Check if URI exists. Defaults to True. sleep429 (int, optional): Sleep in seconds if 429 HTTP code returned Raises: NotImplementedError: Unsupported DOI repository HTTPError: HTTP Error Status Code URLError: Failed to reach the server Returns: str: file URI """ if "zenodo" in doi.lower(): zenodo_entry_number = doi.split(".")[2] uri = "https://zenodo.org/record/" + zenodo_entry_number + "/files/" + fi_name # check if ressource exists, may throw exception if validate_uri: try: socket.setdefaulttimeout(10) # seconds _ = urllib.request.urlopen(uri, timeout=10) except TimeoutError: raise RuntimeError(f"Cannot open {uri}. Timeout error.") except urllib.error.HTTPError as hte: if hte.code == 429: if sleep429/5 > 10: raise TimeoutError( "Too many iteration of increasing waiting time!") logger.warning(f"HTTP error returned from URI: {uri}") logger.warning(f"Site returns 429 code." f" Try to sleep {sleep429} seconds and repeat!") time.sleep(sleep429) return resolve_download_file_url(doi, fi_name, validate_uri, sleep429=sleep429+5) else: raise hte return uri else: raise NotImplementedError( "Repository not validated. Please upload the data for example to zenodo.org" )
[docs] def calc_file_sha1_hash(fi: str, step: int = 67108864, one_block: bool = True) -> str: """ Calculates SHA1 hash of given file using hashlib. :param fi: (str) path to file :param step: (int, optional) file read bytes step. Defaults to 64MB. :param one_block: (bool, optional) read just a single block. Defaults to True. :returns str: sha1 filehash of 40 char length """ sha1_hash = hashlib.sha1() n_tot_steps = math.ceil(os.path.getsize(fi) / step) with open(fi, "rb") as f: if one_block: block = f.read(step) sha1_hash.update(block) else: # we don't need tqdm from one-block SHA1 with tqdm(total=n_tot_steps) as pbar: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: f.read(step), b""): sha1_hash.update(byte_block) pbar.update(1) return sha1_hash.hexdigest()
[docs] def create_databank_directories( sim: Mapping, sim_hashes: Mapping, out: str, dry_run_mode: bool = False ) -> str: """ Creates nested output directory structure to save results. :param sim: Processed simulation entries. :param sim_hashes: File hashes needed for directory structure. :param out: Output base path (str). :param dry_run_mode: If True, do not create directories, just return the path. :returns: Output directory (str). :raises NotImplementedError: If the simulation software is unsupported. :raises OSError: If an error occurs while creating the output directory. :raises FileExistsError: If the output directory already exists and is not empty. """ # resolve output dir naming if sim["SOFTWARE"] == "gromacs": head_dir = sim_hashes.get("TPR")[0][1][0:3] sub_dir1 = sim_hashes.get("TPR")[0][1][3:6] sub_dir2 = sim_hashes.get("TPR")[0][1] sub_dir3 = sim_hashes.get("TRJ")[0][1] elif sim["SOFTWARE"] == "openMM" or sim["SOFTWARE"] == "NAMD": head_dir = sim_hashes.get("TRJ")[0][1][0:3] sub_dir1 = sim_hashes.get("TRJ")[0][1][3:6] sub_dir2 = sim_hashes.get("TRJ")[0][1] sub_dir3 = sim_hashes.get("TRJ")[0][1] else: raise NotImplementedError(f"sim software '{sim['SOFTWARE']}' not supported") directory_path = os.path.join(out, head_dir, sub_dir1, sub_dir2, sub_dir3) logger.debug(f"output_dir = {directory_path}") # destination directory is not empty if os.path.exists(directory_path) and os.listdir(directory_path) != 0: raise FileExistsError( f"Output directory '{directory_path}' is not empty. Delete it if you wish." ) # create directories if not dry_run_mode: os.makedirs(directory_path, exist_ok=True) return directory_path