Source code for cloudly.gcp.scheduler

__all__ = ['Job']

import json
from typing import Literal

from google.cloud import scheduler_v1

from .auth import get_credentials, get_project_id
from .compute import validate_label_key
from .workflows import Workflow


def _call_client(method: str, *args, **kwargs):
    with scheduler_v1.CloudSchedulerClient(credentials=get_credentials()) as client:
        return getattr(client, method)(*args, **kwargs)


[docs] class Job:
[docs] @classmethod def create( cls, name: str, *, cron_schedule: str, workflow: Workflow, workflow_args: dict | None = None, timezone: str, ): """ Parameters ---------- name You often want to add some randomness to the name to guarantee its uniqueness. cron_schedule See https://cloud.google.com/scheduler/docs/configuring/cron-job-schedules """ validate_label_key(name) parent = f'projects/{get_project_id()}/locations/{workflow.region}' if workflow_args: body = json.dumps(workflow_args).encode('utf-8') else: body = None job = scheduler_v1.Job( name=f'{parent}/jobs/{name}', schedule=cron_schedule, time_zone=timezone, http_target=scheduler_v1.HttpTarget( uri=f'https://workflowexecutions.googleapis.com/v1/{workflow.name}/executions', http_method=scheduler_v1.HttpMethod(scheduler_v1.HttpMethod.POST), oauth_token=scheduler_v1.OAuthToken( # service_account_email=get_service_account_email() ), body=body, ), ) req = scheduler_v1.CreateJobRequest(parent=parent, job=job) resp = _call_client('create_job', req) return cls(resp)
[docs] def __init__(self, name_or_obj: str | scheduler_v1.Job, /): if isinstance(name_or_obj, str): self.name = name_or_obj self.job = None self._refresh() else: self.name = name_or_obj.name self.job = name_or_obj
def __repr__(self): return f"{self.__class__.__name__}('{self.name}')" def __str__(self): return self.__repr__() def _refresh(self): req = scheduler_v1.GetJobRequest(name=self.name) self.job = _call_client('get_job', req) return self
[docs] def delete(self): req = scheduler_v1.DeleteJobRequest(name=self.name) _call_client('delete_job', req) self.job = None
[docs] def state( self, ) -> Literal['ACTIVE', 'ENABLED', 'PAUSED', 'DISABLED', 'STATE_UNSPECIFIED']: return self._refresh().job.state.name