"""Utility functions to:
1. download and unzip software releases from the USGS and other organizations
(triangle, MT3DMS).
2. download the latest MODFLOW-based applications and utilities for MacOS,
Linux, and Windows from https://github.com/MODFLOW-USGS/executables
3. determine the latest version (GitHub tag) of a GitHub repository and a
dictionary containing the file name and the link to a asset on
contained in a github repository
4. compress all files in a list, files in a list of directories
"""
import os
import shutil
import sys
import tarfile
import timeit
from http.client import responses
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo
import requests
[docs]
class pymakeZipFile(ZipFile):
"""ZipFile file attributes are not being preserved. This class preserves
file attributes as described on StackOverflow at
https://stackoverflow.com/questions/39296101/python-zipfile-removes-execute-permissions-from-binaries
"""
[docs]
@staticmethod
def compressall(
path,
file_pths=None,
dir_pths=None,
patterns=None,
append=False,
):
"""Compress selected files or files in selected directories.
Parameters
----------
path : str
output zip file path
file_pths : str or list of str
file paths to include in the output zip file (default is None)
dir_pths : str or list of str
directory paths to include in the output zip file (default is None)
patterns : str or list of str
file patterns to include in the output zip file (default is None)
append : bool
boolean indicating if file paths should be appended to an existing
zip file
Returns
-------
success : bool
boolean indicating if the output zip file was created
"""
# create an empty list
if file_pths is None:
file_pths = []
# convert files to a list
else:
if isinstance(file_pths, str):
file_pths = [file_pths]
elif isinstance(file_pths, tuple):
file_pths = list(file_pths)
# remove directories from the file list
if len(file_pths) > 0:
file_pths = [e for e in file_pths if os.path.isfile(e)]
# convert dirs to a list if a str (a tuple is allowed)
if dir_pths is None:
dir_pths = []
else:
if isinstance(dir_pths, (str, Path)):
dir_pths = [dir_pths]
# convert find to a list if a str (a tuple is allowed)
if patterns is not None:
if isinstance(patterns, str):
patterns = [patterns]
# walk through dirs and add files to the list
for dir_pth in dir_pths:
for dirname, subdirs, files in os.walk(dir_pth):
for filename in files:
fpth = os.path.join(dirname, filename)
# add the file if it does not exist in file_pths
if fpth not in file_pths:
file_pths.append(fpth)
# remove file_paths that do not match the patterns
if patterns is not None:
tlist = []
for file_pth in file_pths:
if any(p in os.path.basename(file_pth) for p in patterns):
tlist.append(file_pth)
file_pths = tlist
if append and Path(path).exists():
mode = "a"
else:
mode = "w"
success = True
if len(file_pths) > 0:
with ZipFile(path, mode=mode, compression=ZIP_DEFLATED) as zf:
for file_pth in file_pths:
arcname = os.path.basename(file_pth)
zf.write(file_pth, arcname=arcname)
else:
print("No files to add to the zip file")
success = False
return success
def _request_get(url, verify=True, timeout=1, max_requests=10, verbose=False):
"""Make a url request
Parameters
----------
url : str
url address for the zip file
verify : bool
boolean indicating if the url request should be verified
(default is True)
timeout : int
url request time out length (default is 1 seconds)
max_requests : int
number of url download request attempts (default is 10)
verbose : bool
boolean indicating if output will be printed to the terminal
(default is False)
Returns
-------
req : request object
request object for url
"""
if verbose:
print(f"request url '{url}'")
for idx in range(max_requests):
if verbose:
print(f" request attempt {idx + 1} of {max_requests}")
req = None
try:
req = requests.get(
url,
stream=True,
verify=verify,
timeout=timeout,
)
if verbose:
print(f" status: {responses[req.status_code]}")
except:
continue
if req.status_code == 200:
break
# final test for success
if req is None:
raise ConnectionError(f"Could not get data from: {url}")
else:
req.raise_for_status()
return req
def _request_header(url, max_requests=10, timeout=1, verbose=False):
"""Get the headers from a url
Parameters
----------
url : str
url address for the zip file
max_requests : int
number of url download request attempts (default is 10)
timeout : int
url request time out length (default is 1 seconds)
verbose : bool
boolean indicating if output will be printed to the terminal
(default is False)
Returns
-------
header : request header object
request header object for url
"""
if verbose:
print(f"request url: '{url}'")
for idx in range(max_requests):
if verbose:
print(f" request attempt {idx + 1} of {max_requests}")
header = None
try:
header = requests.head(
url,
allow_redirects=True,
timeout=timeout,
)
if verbose:
print(f" status: {responses[header.status_code]}")
except:
continue
if header.status_code == 200:
break
# final test for success
if header is None:
raise ConnectionError(f"Could not get header from: {url}")
else:
header.raise_for_status()
return header
[docs]
def download_and_unzip(
url,
pth="./",
delete_zip=True,
verify=True,
timeout=30,
max_requests=10,
chunk_size=2048000,
verbose=False,
):
"""Download and unzip a zip file from a url.
Parameters
----------
url : str
url address for the zip file
pth : str
path where the zip file will be saved (default is the current path)
delete_zip : bool
boolean indicating if the zip file should be deleted after it is
unzipped (default is True)
verify : bool
boolean indicating if the url request should be verified
timeout : int
url request time out length (default is 30 seconds)
max_requests : int
number of url download request attempts (default is 10)
chunk_size : int
maximum url download request chunk size (default is 2048000 bytes)
verbose : bool
boolean indicating if output will be printed to the terminal
Returns
-------
"""
# create download directory
if not os.path.exists(pth):
if verbose:
print(f"Creating the directory:\n {pth}")
os.makedirs(pth)
if verbose:
print(f"Attempting to download the file:\n {url}")
# define the filename
file_name = os.path.join(pth, url.split("/")[-1])
# download the file
success = False
tic = timeit.default_timer()
# open request
req = _request_get(
url,
verify=verify,
timeout=timeout,
max_requests=max_requests,
verbose=verbose,
)
# get content length, if available
tag = "Content-length"
if tag in req.headers:
file_size = req.headers[tag]
len_file_size = len(file_size)
file_size = int(file_size)
bfmt = "{:" + f"{len_file_size}" + ",d}"
sbfmt = "{:>" + f"{len(bfmt.format(int(file_size)))}" + "s} bytes"
msg = f" file size: {sbfmt.format(bfmt.format(int(file_size)))}"
if verbose:
print(msg)
else:
file_size = 0.0
# download data from url
for idx in range(max_requests):
# print download attempt message
if verbose:
print(f" download attempt: {idx + 1}")
# connection established - download the file
download_size = 0
try:
with open(file_name, "wb") as f:
for chunk in req.iter_content(chunk_size=chunk_size):
if chunk:
# increment the counter
download_size += len(chunk)
# write the chunk
f.write(chunk)
# write information to the screen
if verbose:
if file_size > 0:
download_percent = float(download_size) / float(
file_size
)
msg = (
" downloaded "
+ sbfmt.format(bfmt.format(download_size))
+ " of "
+ bfmt.format(int(file_size))
+ " bytes"
+ f" ({download_percent:10.4%})"
)
else:
msg = (
" downloaded "
+ sbfmt.format(bfmt.format(download_size))
+ " bytes"
)
print(msg)
else:
sys.stdout.write(".")
sys.stdout.flush()
success = True
except:
# reestablish request
req = _request_get(
url,
verify=verify,
timeout=timeout,
max_requests=max_requests,
verbose=verbose,
)
# try to download the data again
continue
# terminate the download attempt loop
if success:
break
# write the total download time
toc = timeit.default_timer()
tsec = toc - tic
if verbose:
print(f"\ntotal download time: {tsec} seconds")
if success:
if file_size > 0:
if verbose:
print(f"download speed: {file_size / (1e6 * tsec)} MB/s")
else:
msg = f"could not download...{url}"
raise ConnectionError(msg)
# Unzip the file, and delete zip file if successful.
if "zip" in os.path.basename(file_name) or "exe" in os.path.basename(file_name):
z = pymakeZipFile(file_name)
try:
# write a message
if not verbose:
sys.stdout.write("\n")
print(f"uncompressing...'{file_name}'")
# extract the files
z.extractall(pth)
except:
p = "Could not unzip the file. Stopping."
raise Exception(p)
z.close()
elif "tar" in os.path.basename(file_name):
ar = tarfile.open(file_name)
ar.extractall(path=pth)
ar.close()
# delete the zipfile
if delete_zip:
if verbose:
print("Deleting the zipfile...")
os.remove(file_name)
if verbose:
print("Done downloading and extracting...\n")
return success
[docs]
def zip_all(
path,
file_pths=None,
dir_pths=None,
patterns=None,
append=False,
):
"""Compress all files in the user-provided list of file paths and directory
paths that match the provided file patterns.
Parameters
----------
path : str
path of the zip file that will be created
file_pths : str or list
file path or list of file paths to be compressed
dir_pths : str or list
directory path or list of directory paths to search for files that
will be compressed
patterns : str or list
file pattern or list of file patterns s to match to when creating a
list of files that will be compressed
append : bool
boolean indicating if file paths should be appended to an existing
zip file
Returns
-------
"""
return pymakeZipFile.compressall(
path,
file_pths=file_pths,
dir_pths=dir_pths,
patterns=patterns,
append=append,
)
def _get_zipname(platform):
"""Determine zipfile name for platform.
Parameters
----------
platform : str
Platform that will run the executables. Valid values include mac,
linux, win32 and win64. If platform is None, then routine will
download the latest asset from the github repository.
Returns
-------
zipfile : str
Name of zipfile for platform
"""
if platform is None:
if sys.platform.lower() == "darwin":
platform = "mac"
elif sys.platform.lower().startswith("linux"):
platform = "linux"
elif "win" in sys.platform.lower():
is_64bits = sys.maxsize > 2**32
if is_64bits:
platform = "win64"
else:
platform = "win32"
else:
errmsg = f"Could not determine platform. sys.platform is {sys.platform}"
raise Exception(errmsg)
else:
msg = f"unknown platform detected ({platform})"
success = platform in ["mac", "linux", "win32", "win64"]
if not success:
raise ValueError(msg)
return f"{platform}.zip"
def _get_default_repo():
"""Return the default repo name.
Returns
-------
default_repo : str
default github repository repo name
"""
return "MODFLOW-USGS/executables"
def _get_default_url():
"""Return the default executables url path.
Returns
-------
default_url : str
default url for executables repository repo name
"""
return f"https://github.com/{_get_default_repo()}/releases/latest/download/"
def _get_default_json(tag_name=None):
"""Return a default github api json for the provided release tag_name in a
github repository.
Parameters
----------
tag_name : str, optional
github repository release tag
Returns
-------
json_obj : dict
json object (dictionary) with a tag_name and assets including
file names and download links
"""
# initialize json_obj dictionary
json_obj = {"tag_name": tag_name}
# create appropriate url
url = f"https://github.com/{_get_default_repo()}/releases/latest/download/"
if tag_name:
url += f"{tag_name}/"
# define asset names and paths for assets
names = ["mac.zip", "linux.zip", "win32.zip", "win64.zip"]
paths = [url + p for p in names]
assets_list = []
for name, path in zip(names, paths):
assets_list.append({"name": name, "browser_download_url": path})
json_obj["assets"] = assets_list
return json_obj
def _get_request_json(request_url, verbose=False, verify=True):
"""Process a url request and return a json if successful.
Parameters
----------
request_url : str
url for request
verbose : bool
boolean indicating if output will be printed to the terminal
default is false
verify : bool
boolean indicating if the url request should be verified
Returns
-------
success : bool
boolean indicating if the requat failed
status_code: integer
request status code
json_obj : dict
json object
"""
import json
max_requests = 10
json_obj = None
success = True
# open request
req = _request_get(
request_url, max_requests=max_requests, verbose=verbose, verify=verify
)
# connection established - retrieve the json
if req.ok:
json_obj = json.loads(req.text or req.content)
else:
success = req.status_code == requests.codes.ok
return success, req, json_obj
def _repo_json(
github_repo, tag_name=None, error_return=False, verbose=False, verify=True
):
"""Return the github api json for the latest github release in a github
repository.
Parameters
----------
github_repo : str
Repository name, such as MODFLOW-USGS/modflow6
tag_name : str
github repository release tag
error_return : bool
boolean indicating if None will be returned if there are GitHub API
issues
verbose : bool
boolean indicating if output will be printed to the terminal
verify : bool
boolean indicating if the url request should be verified
Returns
-------
json_obj : dict
json object (dictionary) with a tag_name and assets including
file names and download links
"""
repo_url = f"https://api.github.com/repos/{github_repo}"
if tag_name is None:
request_url = f"{repo_url}/releases/latest"
else:
request_url = f"{repo_url}/releases"
success, _, json_cat = _get_request_json(
request_url, verbose=verbose, verify=verify
)
if success:
request_url = None
for release in json_cat:
if release["tag_name"] == tag_name:
request_url = release["url"]
break
if request_url is None:
msg = f"Could not find tag_name ('{tag_name}') in release catalog"
if error_return:
print(msg)
return None
else:
raise Exception(msg)
else:
msg = "Could not get release catalog from " + request_url
if error_return:
if verbose:
print(msg)
return None
else:
raise Exception(msg)
msg = "Requesting asset data "
if tag_name is not None:
msg += f"for tag_name '{tag_name}' "
msg += f"from: {request_url}"
if verbose:
print(msg)
# process the request
success, req, json_obj = _get_request_json(
request_url, verbose=verbose, verify=verify
)
# evaluate request errors
if not success:
if github_repo == _get_default_repo():
msg = f"will use default values for {github_repo}"
if verbose:
print(msg)
json_obj = _get_default_json(tag_name)
else:
msg = "Could not find json from " + request_url
if verbose:
print(msg)
if error_return:
json_obj = None
else:
req.raise_for_status()
# return json object
return json_obj
[docs]
def get_repo_assets(github_repo=None, version=None, error_return=False, verify=True):
"""Return a dictionary containing the file name and the link to the asset
contained in a github repository.
Parameters
----------
github_repo : str
Repository name, such as MODFLOW-USGS/modflow6. If github_repo is
None set to 'MODFLOW-USGS/executables'
version : str
github repository release tag
error_return : bool
boolean indicating if None will be returned if there are GitHub API
issues
verify : bool
boolean indicating if the url request should be verified
Returns
-------
result_dict : dict
dictionary of file names and links
"""
if github_repo is None:
github_repo = _get_default_repo()
# get json and extract assets
json_obj = _repo_json(
github_repo, tag_name=version, error_return=error_return, verify=verify
)
if json_obj is None:
result_dict = None
else:
assets = json_obj["assets"]
# build simple assets dictionary
result_dict = {}
for asset in assets:
k = asset["name"]
if version is None:
value = github_repo + f"/{k}"
else:
value = asset["browser_download_url"]
result_dict[k] = value
return result_dict
[docs]
def repo_latest_version(github_repo=None, verify=True):
"""Return a string of the latest version number (tag) contained in a github
repository release.
Parameters
----------
github_repo : str
Repository name, such as MODFLOW-USGS/modflow6. If github_repo is
None set to 'MODFLOW-USGS/executables'
Returns
-------
version : str
string with the latest version/tag number
"""
if github_repo is None:
github_repo = _get_default_repo()
# get json
json_obj = _repo_json(github_repo, verify=verify)
return json_obj["tag_name"]
[docs]
def getmfexes(
pth=".",
version=None,
platform=None,
exes=None,
verbose=False,
verify=True,
):
"""Get the latest MODFLOW binary executables from a github site
(https://github.com/MODFLOW-USGS/executables) for the specified operating
system and put them in the specified path.
Parameters
----------
pth : str
Location to put the executables (default is current working directory)
version : str
Version of the MODFLOW-USGS/executables release to use. If version is
None the github repo will be queried for the version number.
platform : str
Platform that will run the executables. Valid values include mac,
linux, win32 and win64. If platform is None, then routine will
download the latest asset from the github repository.
exes : str or list of strings
executable or list of executables to retain
verbose : bool
boolean indicating if output will be printed to the terminal
verify : bool
boolean indicating if the url request should be verified
"""
# set download directory to path in case a selection of files
download_dir = pth
# Determine the platform in order to construct the zip file name
zipname = _get_zipname(platform)
# Evaluate exes keyword
if exes is not None:
download_dir = os.path.join(".", "download_dir")
if isinstance(exes, str):
exes = tuple(exes)
elif isinstance(exes, (int, float)):
msg = "exes keyword must be a string or a list/tuple of strings"
raise TypeError(msg)
# Determine path for file download and then download and unzip
if version is None:
download_url = _get_default_url() + zipname
else:
assets = get_repo_assets(
github_repo=_get_default_repo(), version=version, verify=verify
)
download_url = assets[zipname]
download_and_unzip(
download_url,
download_dir,
verbose=verbose,
verify=verify,
)
if exes is not None:
# make sure pth exists
if not os.path.exists(pth):
if verbose:
print(f"Creating the directory:\n {pth}")
os.makedirs(pth)
# move select files to pth
for f in os.listdir(download_dir):
src = os.path.join(download_dir, f)
dst = os.path.join(pth, f)
for exe in exes:
if exe in f:
shutil.move(src, dst)
break
# remove the download directory
if os.path.isdir(download_dir):
if verbose:
print("Removing folder " + download_dir)
shutil.rmtree(download_dir)
return
[docs]
def getmfnightly(
pth=".",
platform=None,
exes=None,
verbose=False,
verify=True,
):
"""Get the latest MODFLOW 6 binary nightly-build executables from github
(https://github.com/MODFLOW-USGS/modflow6-nightly-build/) for the specified
operating system and put them in the specified path.
Parameters
----------
pth : str
Location to put the executables (default is current working directory)
platform : str
Platform that will run the executables. Valid values include mac,
linux, win32 and win64. If platform is None, then routine will
download the latest asset from the github repository.
exes : str or list of strings
executable or list of executables to retain
verbose : bool
boolean indicating if output will be printed to the terminal
verify : bool
boolean indicating if the url request should be verified
"""
# set download directory to path in case a selection of files
download_dir = pth
# Determine the platform in order to construct the zip file name
zipname = _get_zipname(platform)
# Evaluate exes keyword
if exes is not None:
download_dir = os.path.join(".", "download_dir")
if isinstance(exes, str):
exes = tuple(exes)
elif isinstance(exes, (int, float)):
msg = "exes keyword must be a string or a list/tuple of strings"
raise TypeError(msg)
# Determine path for file download and then download and unzip
# https://github.com/MODFLOW-USGS/modflow6-nightly-build/releases/latest/download/
download_url = (
"https://github.com/MODFLOW-USGS/modflow6-nightly-build/releases/latest/download/"
+ zipname
)
download_and_unzip(
download_url,
download_dir,
verbose=verbose,
verify=verify,
)
if exes is not None:
# make sure pth exists
if not os.path.exists(pth):
if verbose:
print(f"Creating the directory:\n {pth}")
os.makedirs(pth)
# move select files to pth
for f in os.listdir(download_dir):
src = os.path.join(download_dir, f)
dst = os.path.join(pth, f)
for exe in exes:
if exe in f:
shutil.move(src, dst)
break
# remove the download directory
if os.path.isdir(download_dir):
if verbose:
print("Removing folder " + download_dir)
shutil.rmtree(download_dir)
return