# This needs the service account to have the "Compute Admin" role.
from __future__ import annotations
__all__ = ['Instance', 'InstanceConfig']
import logging
import os
import string
import warnings
from typing import Literal
from google.cloud import compute_v1
from cloudly.util.logging import get_calling_file
from .auth import get_credentials, get_project_id
logger = logging.getLogger(__name__)
def validate_label_key(val: str) -> str:
# TODO: use `re`.
if len(val) < 1 or len(val) > 63:
raise ValueError(val)
allowed = string.ascii_lowercase + string.digits + '-_'
if any(c not in allowed for c in val):
raise ValueError(val)
if val[0] not in string.ascii_lowercase:
raise ValueError(val)
return val
def validate_label_value(val: str, *, fix: bool = False) -> str:
"""
See https://cloud.google.com/compute/docs/labeling-resources
"""
val0 = val
if fix:
val = val.strip('- ').lower()
for a in ('<', '>', ' ', '_', '.', ','):
val = val.replace(a, '-')
val = val.replace('/', '--')
val = val.strip(' -')
if len(val) > 63:
val = '__' + val[-61:].lstrip('-_')
warnings.warn(
f"long value was truncated; original value '{val0}' was changed to '{val}"
)
allowed = string.ascii_lowercase + string.digits + '-_'
if any(c not in allowed for c in val):
raise ValueError(f"original: '{val0}'; after fixes: '{val}'")
return val
def validate_local_ssd_size_gb(size_gb: int) -> int:
# Use the returned value.
a, b = divmod(size_gb, 375)
if 0 < b < 300:
# Fail rather than round up a great deal, for visibility.
raise ValueError(
f'`size_gb` for LocalSSD should be a multiple of 375; got {size_gb}'
)
elif b:
# Round up with a warning.
warnings.warn(
f'`size_gb` for LocalSSD is rounded up from {size_gb} to {375 * (a + 1)}'
)
size_gb = 375 * (a + 1)
else: # b == 0
if a == 0:
raise ValueError(
f'`size_gb` for LocalSSD should be a multiple of 375; got {size_gb}'
)
return size_gb
def basic_resource_labels():
caller = get_calling_file()
return {
'created-by-file': os.path.abspath(caller.filename),
'created-by-line': str(caller.lineno),
'created-by-function': caller.function,
}
# Using GPUs
# https://cloud.google.com/nvidia?hl=en
# https://cloud.google.com/compute/docs/gpus/install-drivers-gpu
# https://cloud.google.com/deep-learning-vm/docs/images
# https://cloud.google.com/deep-learning-vm/docs/images#listing-versions
# https://cloud.google.com/deep-learning-vm/docs/create-vm-instance-gcloud
# https://www.googlecloudcommunity.com/gc/Infrastructure-Compute-Storage/Frequent-NVIDIA-drivers-reinstall-needed-on-boot/m-p/797413
#
# There are mainly two things to make GPU work:
# (1) specify a machine with GPU
# (2) install nvidia drivers
#
# The easy route seems to be:
# (1) use a machine-type that comes with GPUs
# (2) use a boot-image that has nvidia drivers installed
#
# Worked-out scenarios:
#
# machine_type: 'g2-standard-16'
# boot_disk: {'size_gb': 100, 'source_image': 'projects/deeplearning-platform-release/global/images/family/pytorch-latest-gpu'}
#
# This machine comes with a Nvidia L4, python 3.10 and pytorch. The OS is Debian 11 (bullseye).
#
# In place of 'pytorch-latest-gpu', these images also worked in tests: 'common-cu123', 'common-cu124'.
# The 'common-...' images does not have pytorch installed.
#
# In place of 'g2-standard-16', machine-type 'a2-highgpu-1g' also worked, coming with a Nvidia A100.
#
# You can also specify flexible machine_type and GPUs but use a Deeplearning boot image to ease the installation
# of Nvidia drivers. The following gets a machine with two T4 GPUs:
#
# machine_type: 'n1-standard-8'
# gpu: {'gpu_count': 2, 'gpu_type': 'nvidia-tesla-t4'}
# boot_disk: {'size_gb': 100, 'source_image': 'projects/deeplearning-platform-release/global/images/family/common-cu124'}
#
# Finally, you can also use the common boot image (which has no consideration for GPU) and install cuda drivers on startup
# (which is taken care of by this module). (The 'deeplearning' images above also need to run installation, only simpler,
# because the script is already on the disk.) The following worked in tests:
#
# machine_type: 'n1-standard-8'
# gpu: {'gpu_count': 2, 'gpu_type': 'nvidia-tesla-t4'}
# boot_disk: {'size_gb': 100}
# Taken from
# https://github.com/GoogleCloudPlatform/compute-gpu-installation
#
# This script will reboot the machine 2 or 3 times, so it may take
# a few minutes for the machine to be ready. Being able to `ssh` into
# the machine does not guarantee this script has all finished.
# Instead, type `nvidia-smi`. If it works, this script has finished.
cuda_installer = """
if test -f /opt/google/cuda-installer
then
exit
fi
sudo mkdir -p /opt/google/cuda-installer/
cd /opt/google/cuda-installer/ || exit
sudo curl -fSsL -O https://github.com/GoogleCloudPlatform/compute-gpu-installation/releases/download/cuda-installer-v1.2.0/cuda_installer.pyz
sudo python3 cuda_installer.pyz install_cuda
"""
[docs]
class InstanceConfig:
[docs]
class BootDisk:
[docs]
def __init__(
self,
*,
size_gb: int = 50,
source_image: str = 'projects/debian-cloud/global/images/family/debian-11',
):
# See a list of OS images: https://cloud.google.com/compute/docs/images/os-details
# Another good image might be 'projects/ubuntu-os-cloud/global/images/family/ubuntu-2404-lts-amd64',
if size_gb:
assert size_gb >= 30, f'{size_gb} >= 30'
self.size_gb = size_gb # GCP default is 30, but a GPU machine would require at least 40
if source_image.count('/') == 1:
proj, fam = source_image.split('/')
source_image = f'projects/{proj}/global/images/family/{fam}'
self.source_image = source_image
[docs]
def disk(self) -> compute_v1.AttachedDisk:
return compute_v1.AttachedDisk(
boot=True,
auto_delete=True,
initialize_params=compute_v1.AttachedDiskInitializeParams(
source_image=self.source_image,
),
disk_size_gb=self.size_gb,
)
@property
def startup_script(self) -> str:
if self.source_image.startswith('projects/deeplearning-platform-release'):
if '-gpu' in self.source_image or '-cu' in self.source_image:
return 'sudo /opt/deeplearning/install-driver.sh'
return ''
[docs]
class LocalSSD:
[docs]
def __init__(self, *, size_gb: int, mount_path: str = '/mnt', mode: str = 'rw'):
"""
`size_gb` should be a multiple of 375. If not,
the next greater multiple of 375 will be used.
"""
self.size_gb = validate_local_ssd_size_gb(size_gb)
self.mount_path = mount_path
self.mode = mode
[docs]
def disk(self, zone: str) -> compute_v1.AttachedDisk:
return compute_v1.AttachedDisk(
type_=compute_v1.AttachedDisk.Type.SCRATCH.name,
interface='NVME',
disk_size_gb=self.size_gb,
initialize_params=compute_v1.AttachedDiskInitializeParams(
disk_type=f'zones/{zone}/diskTypes/local-ssd',
),
auto_delete=True,
)
@property
def startup_script(self) -> str:
# See https://cloud.google.com/compute/docs/disks/add-local-ssd#formatandmount
# We support a single local SSD disk. It's name is always 'google-local-nvme-ssd-0'.
mode = self.mode
if mode == 'ro':
mode = 'r'
return '\n'.join(
(
'sudo mkfs.ext4 -F /dev/disk/by-id/google-local-nvme-ssd-0',
f'sudo mkdir -p {self.mount_path}',
f'sudo mount /dev/disk/by-id/google-local-nvme-ssd-0 {self.mount_path}',
f'sudo chmod a+{mode} {self.mount_path}',
)
)
[docs]
class GPU:
[docs]
def __init__(self, *, gpu_type: str, gpu_count: int = 1):
"""
Use `gcloud compute accelerator-types list` to see valid values of `gpu_type`.
Some examples: 'nvidia-tesla-t4', 'nvidia-l4', 'nvidia-tesla-a100', 'nvidia-tesla-v100'
"""
assert gpu_type
assert gpu_count
self.gpu_type = gpu_type
self.gpu_count = gpu_count
[docs]
def accelerator(self, zone: str) -> compute_v1.AcceleratorConfig:
return compute_v1.AcceleratorConfig(
accelerator_count=self.gpu_count,
accelerator_type=f'projects/{get_project_id()}/zones/{zone}/acceleratorTypes/{self.gpu_type}',
)
[docs]
def __init__(
self,
*,
name: str,
zone: str,
machine_type: str,
labels: dict[str, str] | None = None,
boot_disk: dict | None = None,
local_ssd: dict | None = None,
network_uri: str = None,
subnet_uri: str = None,
startup_script: str | None = None,
gpu: dict | None = None,
):
"""
`name` is a "display name", but also plays the role of an ID because it must be unique for the project
in the specified region.
`name` must be 1-63 characters long and match the regular expression ``[a-z]([-a-z0-9]*[a-z0-9])?``
which means the first character must be a lowercase letter, and all following characters must be a dash,
lowercase letter, or digit, except the last character, which cannot be a dash.
`zone` is like 'us-west1-a'.
`machine_type`: cheap, low-end machines suitable for lightweights tests:
't2a-standard-1' (1 CPU 4 GiB, $0.0385 / hour)
't2d-standard-1' (1 CPU 4 GiB, $0.0422 / hour)
'c4a-standard-1' (1 CPU 4 GiB, $0.0449 / hour)
'n1-standard-1' (1 CPU 3.75 GiB, $0.0475 / hour)
'e2-standard-2' (2 CPUs 8 GiB, $0.067 / hour)
'n2d-standard-2' (2 CPUs 8 GiB, $0.084 / hour)
'n4-standard-2' (2 CPUs 8 GiB, $0.0948 / hour)
'n2-standard-2' (2 CPUs 8 GiB, $0.097 / hour)
'e2-standard-4' (4 CPUs 16 GiB, $0.134 / hour)
'e2-standard-8' (8 CPUs 32 GiB, $0.27 / hour)
See https://cloud.google.com/compute/all-pricing?hl=en
`network_uri` may look like "projects/shared-vpc-admin/global/networks/vpcnet-shared-prod-01".
`subnet_uri` may look like "https://www.googleapis.com/compute/v1/projects/shared-vpc-admin/regions/<region>/subnetworks/prod-<region>-01".
If `None`, the project's default network and subnet (for the specified region) will be used.
See https://cloud.google.com/compute/docs/networking/network-overview
`startup_script`: shell script that installs software and makes any other preps before the instance becomes operational.
If provided, this must handle everything, as the script will not be augmented in this function.
Common concerns include mounting local disks and installing cuda drivers (if you attach GPUs).
There are some restrictions to the label values.
See https://cloud.google.com/batch/docs/organize-resources-using-labels
"""
validate_label_key(name)
labels = {**basic_resource_labels(), **(labels or {})}
labels = {
validate_label_key(k): validate_label_value(v, fix=True)
for k, v in labels.items()
}
disks = []
boot_disk = self.BootDisk(**(boot_disk or {}))
disks.append(boot_disk.disk())
if local_ssd:
local_ssd = self.LocalSSD(**local_ssd)
disks.append(local_ssd.disk(zone))
guest_accelerators = None
scheduling = None
if machine_type.split('-')[0] in ('a3', 'a2', 'g2'):
# machine types that come with GPUs
if gpu is not None:
raise ValueError(
f'machine_type {machine_type} comes with GPUs; you should not specify `gpu` again'
)
scheduling = compute_v1.Scheduling(on_host_maintenance='TERMINATE')
gpu = True
elif gpu:
gpu = self.GPU(**gpu)
guest_accelerators = [gpu.accelerator(zone)]
scheduling = compute_v1.Scheduling(on_host_maintenance='TERMINATE')
# See https://cloud.google.com/compute/docs/instances/setting-vm-host-options
gpu = True
else:
gpu = False
network = compute_v1.NetworkInterface(
network=network_uri, subnetwork=subnet_uri
)
metadata = None
if not startup_script:
scripts = []
if local_ssd:
scripts.append(local_ssd.startup_script)
if boot_disk.startup_script:
scripts.append(boot_disk.startup_script)
else:
if gpu:
scripts.append(cuda_installer)
# If boot_disk has script, it is installing cuda driver
# (as that is the only scenario where BootDisk has script),
# hence `cuda_installer` is not used.
# NOTE: `cuda_installer` must be the last component of the startup script,
# because `cuda_installer` will reboot the machine and continue afterwards.
if scripts:
startup_script = '\n'.join(['#!/bin/bash'] + scripts)
if startup_script:
metadata = compute_v1.Metadata(
items=[compute_v1.Items(key='startup-script', value=startup_script)]
)
service_accounts = [
compute_v1.ServiceAccount(
scopes=['https://www.googleapis.com/auth/cloud-platform'],
),
]
self._instance = compute_v1.Instance(
name=name,
machine_type=f'zones/{zone}/machineTypes/{machine_type}',
labels=labels,
disks=disks,
network_interfaces=[network],
metadata=metadata,
service_accounts=service_accounts,
guest_accelerators=guest_accelerators,
scheduling=scheduling,
)
self.name = name
self.zone = zone
self.startup_script = startup_script
@property
def instance(self) -> compute_v1.Instance:
# printing the output to see info.
return self._instance
@property
def definition(self) -> dict:
return type(self._instance).to_dict(self._instance)
def _call_client(method: str, *args, **kwargs):
with compute_v1.InstancesClient(credentials=get_credentials()) as client:
return getattr(client, method)(*args, **kwargs)
[docs]
class Instance:
[docs]
@classmethod
def create(cls, config: InstanceConfig | dict) -> Instance:
if not isinstance(config, InstanceConfig):
config = InstanceConfig(**config)
req = compute_v1.InsertInstanceRequest(
project=get_project_id(),
zone=config.zone,
instance_resource=config.instance,
)
op = _call_client('insert', req)
op.result()
# This could raise `google.api_core.exceptions.Forbidden` with message "... QUOTA_EXCEEDED ..."
return cls(config.name, config.zone)
[docs]
@classmethod
def list(cls, zone: str) -> list[Instance]:
req = compute_v1.ListInstancesRequest(project=get_project_id(), zone=zone)
resp = _call_client('list', req)
return [cls(r.name, zone) for r in resp]
[docs]
def __init__(self, name: str, zone: str):
"""
`name` is either the "Name" or "Instance Id" shown on GCP dashboard.
"""
self.name = name
self.zone = zone
self.instance = None
self._refresh()
def __repr__(self):
return f"{self.__class__.__name__}('{self.name}', '{self.zone}')"
def __str__(self):
return self.__repr__()
def _refresh(self):
req = compute_v1.GetInstanceRequest(
instance=self.name, project=get_project_id(), zone=self.zone
)
self.instance = _call_client('get', req)
# This could raise `google.api_core.exceptions.NotFound`
return self
@property
def id(self) -> int:
return self.instance.id
@property
def machine_type(self) -> str:
return self.instance.machine_type.split('/')[-1]
@property
def gpu(self) -> dict:
z = self.instance.guest_accelerators
if len(z):
return type(z[0]).to_dict(z[0])
return {}
@property
def disks(self) -> list[dict]:
return [
{
'boot': disk.boot,
'name': disk.device_name,
'size_gb': disk.disk_size_gb,
'index': disk.index,
'mode': disk.mode,
}
for disk in self.instance.disks
] # first is boot disk
@property
def creation_timestamp(self) -> str:
return self.instance.creation_timestamp
@property
def last_start_timestamp(self) -> str:
return self._refresh().instance.last_start_timestamp
@property
def last_stop_timestamp(self) -> str:
# Can be ''.
return self._refresh().instance.last_stop_timestamp
@property
def ip(self) -> str:
# IP address.
# If you try to `ssh` into this IP address right after seeing success of `Instance.create(...)`,
# you may get "connection refused". Just wait a few more seconds for the machine to be ready.
return self.instance.network_interfaces[0].network_i_p
[docs]
def delete(self) -> None:
req = compute_v1.DeleteInstanceRequest(
instance=self.name, project=get_project_id(), zone=self.zone
)
logger.info('deleting %s', self)
op = _call_client('delete', req)
op.result()
self.instance = None
[docs]
def state(
self,
) -> Literal[
'PROVISIONING',
'STAGING',
'RUNNING',
'STOPPING',
'SUSPENDING',
'SUSPENDED',
'REPAIRING',
'TERMINATED',
]:
return self._refresh().instance.status