# -*- coding: utf-8 -*-
"""Tools for interacting with GCS infrastructure."""
import os
import re
import shlex
import subprocess
from pathlib import Path
from tqdm.auto import tqdm
from datetime import datetime as dt
from os.path import basename, exists, isdir, join
from google.cloud import storage
from google.oauth2 import service_account
[docs]def authenticated_client(credentials=None, **client_kwargs):
"""Convenience function to create an authenticated GCS client.
Parameters
----------
credentials : str or None, optional
Str path to storage credentials authentication file. If None
is passed (default) will create a Client object with no args, using
the authorization credentials for the current environment. See the
[google cloud storage docs](
https://googleapis.dev/python/google-api-core/latest/auth.html)
for an overview of the authorization options.
client_kwargs : optional
kwargs to pass to the `get_client` function
Returns
-------
google.cloud.storage.Client
"""
if credentials is None:
client = storage.Client()
else:
creds = service_account.Credentials.from_service_account_file(str(credentials))
client = storage.Client(credentials=creds, **client_kwargs)
return client
[docs]def get_bucket(
credentials=None, bucket_name="rhg-data", return_client=False, **client_kwargs
):
"""Return a bucket object from Rhg's GCS system.
Parameters
----------
credentials : str or None, optional
Str path to storage credentials authentication file. If None
is passed (default) will create a Client object with no args, using
the authorization credentials for the current environment. See the
[google cloud storage docs](
https://googleapis.dev/python/google-api-core/latest/auth.html)
for an overview of the authorization options.
bucket_name : str, optional
Name of bucket. Typically, we work with ``rhg_data`` (default)
return_client : bool, optional
Return the Client object as a second object.
client_kwargs : optional
kwargs to pass to the `get_client` function
Returns
-------
bucket : :py:class:`google.cloud.storage.bucket.Bucket`
"""
client = authenticated_client(credentials=credentials, **client_kwargs)
result = client.bucket(bucket_name)
if return_client:
result = (result, client)
return result
def _remove_prefix(text, prefix="/gcs/rhg-data/"):
return text[text.startswith(prefix) and len(prefix) :]
def _get_path_types(src, dest):
src_gs = src
src_gcs = src
dest_gs = dest
dest_gcs = dest
if src_gs.startswith("/gcs/"):
src_gs = src.replace("/gcs/", "gs://")
if src_gcs.startswith("gs://"):
src_gs = src.replace("gs://", "/gcs/")
if dest_gs.startswith("/gcs/"):
dest_gs = dest.replace("/gcs/", "gs://")
if dest_gcs.startswith("gs://"):
dest_gcs = dest.replace("gs://", "/gcs/")
return src_gs, src_gcs, dest_gs, dest_gcs
[docs]def rm(path, flags=[]):
"""Remove a file or recursively remove a directory from local
path to GCS or vice versa. Must have already authenticated to use.
Notebook servers are automatically authenticated, but workers
need to pass the path to the authentication json file to the
GOOGLE_APPLICATION_CREDENTIALS env var.
This is done automatically for rhg-data.json when using the get_worker
wrapper.
Parameters
----------
path : str or :class:`pathlib.Path`
The path to the source and destination file or directory.
Either the `/gcs` or `gs:/` prefix will work.
flags : list of str, optional
String of flags to add to the gsutil rm command. e.g.
`flags=['r']` will run the command `gsutil -m rm -r...`
(recursive remove)
Returns
-------
str
stdout from gsutil call
str
stderr from gsutil call
:py:class:`datetime.timedelta`
Time it took to copy file(s).
"""
st_time = dt.now()
path = str(path).replace("/gcs/", "gs://")
cmd = "gsutil -m rm " + " ".join(["-" + f for f in flags]) + f" {path}"
print(f"Running cmd: {cmd}")
cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
end_time = dt.now()
return stdout, stderr, end_time - st_time
[docs]def replicate_directory_structure_on_gcs(src, dst, client):
"""
Replicate a local directory structure on google cloud storage
Parameters
----------
src : str
Path to the root directory on the source machine. The directory
structure within this directory will be reproduced within `dst`,
e.g. `/Users/myusername/my/data`
dst : str
A url for the root directory of the destination, starting with
`gs://[bucket_name]/`, e.g. `gs://my_bucket/path/to/my/data`
client : google.cloud.storage.client.Client
An authenticated :py:class:`google.cloud.storage.client.Client` object.
"""
if dst.startswith("gs://"):
dst = dst[5:]
elif dst.startswith("gcs://"):
dst = dst[6:]
else:
raise ValueError("dst must begin with `gs://` or `gcs://`")
bucket_name = dst.split("/")[0]
blob_path = "/".join(dst.split("/")[1:])
bucket = client.bucket(bucket_name)
for d, dirnames, files in os.walk(src):
dest_path = os.path.join(blob_path, os.path.relpath(d, src))
# make sure there is exactly one trailing slash:
dest_path = dest_path.rstrip("/") + "/"
# ignore "." directory
if dest_path == "./":
continue
blob = bucket.blob(dest_path)
blob.upload_from_string("")
[docs]def cp(src, dest, flags=[]):
"""Copy a file or recursively copy a directory from local
path to GCS or vice versa. Must have already authenticated to use.
Notebook servers are automatically authenticated, but workers
need to pass the path to the authentication json file to the
GOOGLE_APPLICATION_CREDENTIALS env var.
This is done automatically for rhg-data.json when using the get_worker
wrapper.
Parameters
----------
src, dest : str
The paths to the source and destination file or directory.
If on GCS, either the `/gcs` or `gs:/` prefix will work.
flags : list of str, optional
String of flags to add to the gsutil cp command. e.g.
`flags=['r']` will run the command `gsutil -m cp -r...`
(recursive copy)
Returns
-------
str
stdout from gsutil call
str
stderr from gsutil call
:py:class:`datetime.timedelta`
Time it took to copy file(s).
"""
st_time = dt.now()
src = str(src)
dest = str(dest)
# make sure we're using URL
# if /gcs or gs:/ are not in src or not in dest
# then these won't change anything
src_gs, src_gcs, dest_gs, dest_gcs = _get_path_types(src, dest)
# if directory already existed cp would put src into dest_gcs
if exists(dest_gcs):
dest_base = join(dest_gcs, basename(src))
# else cp would have put the contents of src into the new directory
else:
dest_base = dest_gcs
cmd = (
"gsutil -m cp "
+ " ".join(["-" + f for f in flags])
+ " {} {}".format(src_gs, dest_gs)
)
print(f"Running cmd: {cmd}")
cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise ValueError(stderr)
# need to add directories if you were recursively copying a directory
if isdir(src_gcs) and dest_gcs.startswith("/gcs/"):
# now make directory blobs on gcs so that gcsfuse recognizes it
dirs_to_make = [x[0].replace(src, dest_base) for x in os.walk(src)]
for d in dirs_to_make:
os.makedirs(d, exist_ok=True)
end_time = dt.now()
return stdout, stderr, end_time - st_time
[docs]def sync(src, dest, flags=["r", "d"]):
"""Sync a directory from local to GCS or vice versa. Uses `gsutil rsync`.
Must have already authenticated to use. Notebook servers
are automatically authenticated, but workers need to pass the path
to the authentication json file to the GCLOUD_DEFAULT_TOKEN_FILE env var.
This is done automatically for rhg-data.json when using the get_worker
wrapper.
Parameters
----------
src, dest : str
The paths to the source and destination file or directory.
If on GCS, either the `/gcs` or `gs:/` prefix will work.
flags : list of str, optional
String of flags to add to the gsutil cp command. e.g.
`flags=['r','d']` will run the command `gsutil -m cp -r -d...`
(recursive copy, delete any files on dest that are not on src).
This is the default set of flags.
Returns
-------
str
stdout from gsutil call
str
stderr from gsutil call
:py:class:`datetime.timedelta`
Time it took to copy file(s).
"""
st_time = dt.now()
src = str(src)
dest = str(dest)
# remove trailing /'s
src = src.rstrip("/")
dest = dest.rstrip("/")
# make sure we're using URL
# if /gcs or gs:/ are not in src or not in dest
# then these won't change anything
src_gs, src_gcs, dest_gs, dest_gcs = _get_path_types(src, dest)
cmd = (
"gsutil -m rsync "
+ " ".join(["-" + f for f in flags])
+ " {} {}".format(src_gs, dest_gs)
)
print(f"Running cmd: {cmd}")
cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
# need to add directories if you were recursively copying a directory TO
# gcs now make directory blobs on gcs so that gcsfuse recognizes it
if dest_gcs.startswith("/gcs/"):
dirs_to_make = [x[0].replace(src_gcs, dest_gcs) for x in os.walk(src_gcs)]
for d in dirs_to_make:
os.makedirs(d, exist_ok=True)
end_time = dt.now()
return stdout, stderr, end_time - st_time
[docs]def ls(dir_path):
"""List a directory quickly using `gsutil`"""
dir_url = str(dir_path).replace("/gcs/", "gs://")
cmd = f"gsutil ls {dir_url}"
print(f"Running cmd: {cmd}")
cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
res = [x.split("/")[1].rstrip(r"\\n") for x in str(stdout).split(dir_url)[2:]]
return res
def _fetch_dirs(bucket, prefix=None):
"""Return set of all directory paths within a GCS bucket blob names
Parameters
----------
bucket : google.cloud.storage.bucket.Bucket
Returns
-------
all_dirs : set
"""
all_dirs = set([])
for blob in bucket.list_blobs(prefix=prefix):
p = Path(blob.name).parent
# Skip root dir.
if p == Path("."):
continue
# Parse parent dirs in blob name.
for i in range(1, len(p.parts) + 1):
parentdir = str(Path(*p.parts[:i])) + "/"
all_dirs.add(parentdir)
return all_dirs
[docs]def create_directory_markers(bucket_name, project=None, client=None, prefix=None):
"""
Add directory markers in-place on a google cloud storage bucket
Parameters
----------
bucket_name : str
name of the Google Cloud Storage bucket
project : str or None, optional
name of the Google Cloud Platform project. If None, inferred
from the default project as determined by the client.
client : google.cloud.storage.client.Client or None, optional
Optionally pass a google.cloud.storage Client object to set
auth and settings
prefix : str or None, optional
Prefix (relative to bucket root) below which to create markers
"""
if client is None:
client = storage.Client(project=project)
bucket = client.bucket(bucket_name)
# Create empty blob for any non-exist directories
dirs = _fetch_dirs(bucket, prefix=prefix)
if len(dirs) == 0:
return
for target_dir in tqdm(dirs, desc='creating markers'):
dir_blob = bucket.blob(target_dir)
if dir_blob.exists():
continue
# Create empty blob as dir placeholder.
dir_blob.upload_from_string(
"",
content_type="application/x-www-form-urlencoded;charset=UTF-8"
)
[docs]def create_directories_under_blob(blob, project=None, client=None):
groups = re.match('gs://(?P<bucket>[^/]+)(?P<prefix>.*)$', blob)
bucket = groups.group('bucket')
prefix = groups.group('prefix').strip('/')
if len(prefix) == 0:
prefix = None
create_directory_markers(bucket_name=bucket, project=project, client=client, prefix=prefix)