Source code for cloudly.gcp.batch

"""
This module wraps a subset of the GCP Batch functionalities that are useful and adequate for typical applications.

User code runs within a Docker container. Running a standalone script is not supported by this wrapper module.
"""

# This needs the service account to have "Batch Job Editor" role.

from __future__ import annotations

__all__ = ['Job', 'JobConfig']

from typing import Literal

from google.cloud import batch_v1
from google.protobuf.duration_pb2 import Duration

from .auth import get_credentials, get_project_id
from .compute import (
    basic_resource_labels,
    validate_label_key,
    validate_label_value,
    validate_local_ssd_size_gb,
)

# Using GPUs
#
# See https://www.googlecloudcommunity.com/gc/Infrastructure-Compute-Storage/GCP-Batch-use-NVDIA-GPU-to-train-models-what-installation-are/m-p/784063
#
# One scenario that seemed to work:
#
#    allocation_policy
#        machine_type: 'n1-standard-16'
#    gpu: {'gpu_type': 'nvidia-tesla-4', 'gpu_count': 1}
#    task_group:
#        task_spec:
#            container:
#                image_uri: <ubuntu 20.04 image>
#                commands: 'nvidia-smi'
#                options: '--rm --init'
#
# Test reported success, so at least the command `nvidia-smi` was present in the container.
#
# Another scenario that seemed to work:
#
#    allocation_policy
#        machine_type: 'g2-standard-16'
#    task_group:
#        task_spec:
#            container:
#                image_uri: <ubuntu 20.04 image>
#                commands: 'nvidia-smi'
#                options: '--rm --init'
#
# The g2 machine comes with GPUs.
# Note the absence of `gpu: {...}` setting.
# Also note that `--runtime=nvidia` was not accepted, whereas `--gpus=all` was not necessary.
#
# Some accommodations for GPU have been made by this module if you do not specify them explicitly.
# The accommodations mainly concern `install_gpu_drivers` and `boot_disk`.
# See `JobConfig.allocation_policy` for details.


class TaskConfig:
    class Container:
        def __init__(
            self,
            *,
            image_uri: str,
            commands: str,
            options: str | None = None,
            local_ssd_disk: JobConfig.LocalSSD | None = None,
            local_ssd_mount_path: str = None,
            **kwargs,
        ):
            """
            Parameters
            ----------
            image_uri
                The full tag of the image.
            commands
                The commands to be run within the Docker container, such as 'python -m mypackage.mymodule --arg1 x --arg2 y'.
                This is a single string that is run as a shell script. Inside the container, the command that is executed is

                    /bin/bash -c "<commands>"

                This is the command you would type verbatim in the console inside the container.
            options
                The option string to be applied to `docker run`, such as '-e NAME=abc --network host'. As this example shows,
                environment variables that you want to be passed into the container are also handled by `options`.

                Usually `options` should contain '--rm --init --log-driver=gcplogs', among others.
            local_ssd_disk
                This is provided by `TaskConfig.__init__`; user should not provide this directly.
            """

            if local_ssd_disk is not None:
                volumes = [
                    f'{local_ssd_disk.mount_path}:{local_ssd_mount_path or local_ssd_disk.mount_path}:{local_ssd_disk.mode}'
                ]
            else:
                volumes = None

            self._container = batch_v1.Runnable.Container(
                image_uri=image_uri,
                commands=['-c', commands],
                options=options,
                entrypoint='/bin/bash',
                volumes=volumes,
                **kwargs,
            )

        @property
        def container(self) -> batch_v1.Runnable.Container:
            return self._container

    def __init__(
        self,
        *,
        container: dict,
        max_run_duration_seconds: int | None = None,
        max_retry_count: int = 0,
        ignore_exit_status: bool = False,
        always_run: bool = False,
        local_ssd_disk: JobConfig.LocalSSD | None = None,
        **kwargs,
    ):
        """
        Parameters
        ----------
        local_ssd_disk
            This is provided by `JobConfig.__init__`. User should not provide this directly.
        """
        container = self.Container(**container, local_ssd_disk=local_ssd_disk)
        runnable = batch_v1.Runnable(
            container=container.container,
            ignore_exit_status=ignore_exit_status,
            always_run=always_run,
        )

        if max_run_duration_seconds:
            max_run_duration = Duration(seconds=max_run_duration_seconds)
        else:
            max_run_duration = None

        self._task_spec = batch_v1.TaskSpec(
            runnables=[runnable],
            max_run_duration=max_run_duration,
            max_retry_count=max_retry_count,
            volumes=None if local_ssd_disk is None else [local_ssd_disk.volume],
            **kwargs,
        )

    @property
    def task_spec(self) -> batch_v1.TaskSpec:
        return self._task_spec


[docs] class JobConfig:
[docs] class BootDisk:
[docs] def __init__( self, *, size_gb: int, disk_type: Literal[ 'pd-balanced', 'pd-extreme', 'pd-ssd', 'pd-standard' ] = 'pd-balanced', image: str | None = None, ): """ `image`: 'batch-debian' seems to be a good value for GPUs; otherwise `batch-cos` may also work well. Leave it at `None` until needed. See https://cloud.google.com/batch/docs/vm-os-environment-overview """ assert size_gb >= 30 self.size_gb = size_gb self.type_ = disk_type self.image = image
@property def disk(self) -> batch_v1.AllocationPolicy.Disk: return batch_v1.AllocationPolicy.Disk( image=self.image, type_=self.type_, size_gb=self.size_gb, )
[docs] class LocalSSD: """ Local SSDs are attached to each worker node for use during the lifetime of the tasks. They are not "persistent" storage that lives beyond the batch job. """
[docs] def __init__( self, *, size_gb: int, device_name: str = 'local-ssd', mount_path: str = '/mnt', mode: Literal['ro', 'rw'] = 'rw', ): """ `size_gb` should be a multiple of 375. If not, the next greater multiple of 375 will be used. """ self.size_gb = validate_local_ssd_size_gb(size_gb) self.device_name = device_name self.mount_path = mount_path self.mode = mode
@property def disk(self) -> batch_v1.AllocationPolicy.AttachedDisk: return batch_v1.AllocationPolicy.AttachedDisk( new_disk=batch_v1.AllocationPolicy.Disk( type_='local-ssd', size_gb=self.size_gb ), device_name=self.device_name, ) @property def volume(self) -> batch_v1.Volume: opts = ['async', self.mode] if self.mode == 'ro': opts.append('noload') return batch_v1.Volume( device_name=self.device_name, mount_path=self.mount_path, mount_options=opts, )
[docs] class GPU:
[docs] def __init__(self, *, gpu_type: str, gpu_count: int): """ Use `gcloud compute accelerator-types list` to see valid values of `gpu_type`. Some examples: 'nvidia-tesla-t4', 'nvidia-l4', 'nvidia-tesla-a100', 'nvidia-tesla-v100' """ assert gpu_type assert gpu_count self.gpu_type = gpu_type self.gpu_count = gpu_count
@property def accelerator(self) -> batch_v1.AllocationPolicy.Accelerator: return batch_v1.AllocationPolicy.Accelerator( type_=self.gpu_type, count=self.gpu_count )
[docs] @classmethod def task_group( cls, *, task_spec: dict, task_count: int = 1, task_count_per_node: int = 1, parallelism: int | None = None, permissive_ssh: bool = True, **kwargs, ) -> batch_v1.TaskGroup: """ Parameters ---------- task_count Number of tasks to be created. task_count_per_node Number of tasks that can be running on a work node at any time. parallelism Number of tasks that can be running across all nodes at any time. """ task_spec = TaskConfig(**task_spec).task_spec return batch_v1.TaskGroup( task_spec=task_spec, task_count=task_count, parallelism=parallelism, task_count_per_node=task_count_per_node, permissive_ssh=permissive_ssh, **kwargs, )
[docs] @classmethod def allocation_policy( cls, *, region: str, labels: dict, network_uri: str | None = None, subnet_uri: str | None = None, machine_type: str, no_external_ip_address: bool = False, provisioning_model: Literal['standard', 'spot', 'preemptible'] = 'standard', boot_disk: dict | None = None, gpu: GPU | None = None, local_ssd_disk: LocalSSD | None = None, install_gpu_drivers: bool | None = None, **kwargs, ) -> batch_v1.AllocationPolicy: """ Parameters ---------- region Like 'us-central1', 'us-west1', etc. network_uri, subnet_uri If missing and `no_external_ip_address` is `False`, the default for the project in the specified region will be used. If `no_external_ip_address` is `True`, then both must be provided. See https://cloud.google.com/compute/docs/networking/network-overview and `google.cloud.batch_v1.types.job.AllocationPolicy.NetworkInterface`. labels, gpu, local_ssd_disk These are provided by `JobConfig.__init__`. User should not provide them directly. """ network = batch_v1.AllocationPolicy.NetworkInterface( network=network_uri, subnetwork=subnet_uri, no_external_ip_address=no_external_ip_address, ) provisioning_model = getattr( batch_v1.AllocationPolicy.ProvisioningModel, provisioning_model.upper() ) if gpu or machine_type.split('-')[0] in ('a2', 'a3', 'g2'): if boot_disk: if boot_disk.get('image', None) is None: boot_disk['image'] = 'batch-debian' if boot_disk.get('size_gb', None) is None: boot_disk['size_gb'] = 50 else: boot_disk = {'size_gb': 50, 'image': 'batch-debian'} if install_gpu_drivers is None: install_gpu_drivers = True if boot_disk: boot_disk = cls.BootDisk(**boot_disk) instance_policy = batch_v1.AllocationPolicy.InstancePolicy( machine_type=machine_type, accelerators=[gpu.accelerator] if gpu else None, provisioning_model=provisioning_model, boot_disk=None if boot_disk is None else boot_disk.disk, disks=None if local_ssd_disk is None else [local_ssd_disk.disk], ) instance_policy_template = batch_v1.AllocationPolicy.InstancePolicyOrTemplate( policy=instance_policy, install_gpu_drivers=install_gpu_drivers, ) return batch_v1.AllocationPolicy( location=batch_v1.AllocationPolicy.LocationPolicy( allowed_locations=[f'regions/{region}'], ), instances=[instance_policy_template], labels=labels, network=batch_v1.AllocationPolicy.NetworkPolicy( network_interfaces=[network], ), service_account=batch_v1.ServiceAccount( # email=get_service_account_email(), ), **kwargs, )
[docs] @classmethod def labels(cls, **kwargs) -> dict[str, str]: # There are some restrictions to the label values. # See https://cloud.google.com/batch/docs/organize-resources-using-labels zz = {**basic_resource_labels(), **kwargs} zz = { validate_label_key(k): validate_label_value(v, fix=True) for k, v in zz.items() } return zz
[docs] def __init__( self, *, task_group: dict, allocation_policy: dict, labels: dict[str, str] | None = None, logs_policy: batch_v1.LogsPolicy | None = None, gpu: dict | None = None, local_ssd: dict | None = None, **kwargs, ): if gpu: gpu = self.GPU(**gpu) # assert 'gpu' not in task_group['task_spec']['container'] # task_group['task_spec']['container']['gpu'] = gpu if local_ssd: local_ssd_disk = self.LocalSSD(**local_ssd) assert 'local_ssd_disk' not in task_group['task_spec'] task_group['task_spec']['local_ssd_disk'] = local_ssd_disk else: local_ssd_disk = None labels = self.labels(**(labels or {})) task_group = self.task_group(**task_group) allocation_policy = self.allocation_policy( gpu=gpu, local_ssd_disk=local_ssd_disk, labels=labels, **allocation_policy, ) if logs_policy is None: logs_policy = batch_v1.LogsPolicy( destination=batch_v1.LogsPolicy.Destination.CLOUD_LOGGING ) self._job = batch_v1.Job( task_groups=[task_group], allocation_policy=allocation_policy, logs_policy=logs_policy, labels=labels, **kwargs, )
@property def job(self) -> batch_v1.Job: return self._job @property def definition(self) -> dict: # `self._job.__str__` actually is formatted nicely, # but we choose to return the built-in dict type. # If you want a nice printout, just print `self.job`. return type(self._job).to_dict(self._job) @property def region(self) -> str: return self._job.allocation_policy.location.allowed_locations[0].split('/')[1]
def _call_client(method: str, *args, **kwargs): with batch_v1.BatchServiceClient(credentials=get_credentials()) as client: return getattr(client, method)(*args, **kwargs)
[docs] class Job:
[docs] @classmethod def create(cls, name: str, config: JobConfig | dict) -> Job: """ There are some restrictions on the form of `name`; see GCP doc for details or `cloudly.gcp.compute`. In addition, the batch name must be unique in the project and the region. """ if not isinstance(config, JobConfig): config = JobConfig(**config) validate_label_key(name) req = batch_v1.CreateJobRequest( parent=f'projects/{get_project_id()}/locations/{config.region}', job_id=name, job=config.job, ) job = _call_client('create_job', req) return cls(job)
[docs] @classmethod def list(cls, *, region: str) -> list[Job]: req = batch_v1.ListJobsRequest( parent=f'projects/{get_project_id()}/locations/{region}' ) jobs = _call_client('list_jobs', req) return [cls(j) for j in jobs]
[docs] def __init__(self, name_or_obj: str | batch_v1.Job, /): """ `name` is like 'projects/<project-id>/locations/<location>/jobs/<name>'. This can also be considered the job "ID" or "URI". This is available from the object returned by :meth:`create`. `job` is not necessary because it can be created if needed. It is accepted in case you already have it. See :meth:`create`, :meth:`list`. """ if isinstance(name_or_obj, str): self.name = name_or_obj self.job = None self._refresh() else: self.name = name_or_obj.name self.job = name_or_obj
def __repr__(self): return f"{self.__class__.__name__}('{self.name}')" def __str__(self): return self.__repr__() def _refresh(self): req = batch_v1.GetJobRequest(name=self.name) self.job = _call_client('get_job', req) return self @property def create_time(self): return self.job.create_time @property def update_time(self): return self._refresh().job.update_time @property def definition(self) -> dict: return type(self.job).to_dict(self.job)
[docs] def status(self) -> batch_v1.JobStatus: """ The returned `JobStatus` object has some useful attributes you can access; see :meth:`state` for an example. """ return self._refresh().job.status
[docs] def state( self, ) -> Literal[ 'STATE_UNSPECIFIED', 'QUEUED', 'SCHEDULED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS', ]: return self.status().state.name
[docs] def delete(self) -> None: req = batch_v1.DeleteJobRequest(name=self.name) _call_client('delete_job', req) self.job = None