Source code for cloudly.gcp.workflows

from __future__ import annotations

__all__ = ['Workflow', 'WorkflowConfig', 'Execution', 'Step', 'BatchStep']

import atexit
import datetime
import json
import string
from collections.abc import Sequence
from typing import Literal

from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1

from .auth import get_credentials, get_project_id
from .batch import JobConfig as BatchJobConfig
from .compute import validate_label_key

_workflow_client_ = None


def _cleanup():
    global _workflow_client_
    if _workflow_client_ is not None:
        _workflow_client_.__exit__(None, None, None)
        _workflow_client_ = None


atexit.register(_cleanup)


def validate_identifier_name(val: str) -> str:
    # TODO: use `re`.
    if val[0] not in string.ascii_lowercase:
        raise ValueError(val)
    for v in val[1:]:
        if v not in string.ascii_lowercase and v not in string.digits and v != '_':
            raise ValueError(val)
    return val


def _call_workflow_client(meth: str, *args, **kwargs):
    global _workflow_client_
    if _workflow_client_ is None:
        _workflow_client_ = workflows_v1.WorkflowsClient(
            credentials=get_credentials()
        ).__enter__()
    return getattr(_workflow_client_, meth)(*args, **kwargs)
    # Can not use context manager in each call, like with "execution client";
    # would be `ValueError: Cannot invoke RPC on closed channel!`.
    # Don't know why; probably due to interaction with "execution client".


def _call_execution_client(meth: str, *args, **kwargs):
    with executions_v1.ExecutionsClient(credentials=get_credentials()) as client:
        return getattr(client, meth)(*args, **kwargs)


[docs] class Step:
[docs] def __init__(self, name: str, content: dict): """ `content` is a dict of the step's action, e.g.,:: { "call": "http.get", "args": { "url": "https://host.com/api1", }, "result": "api_response1", } With "nested steps", `content` will be like this:: { "steps": [ { "step_1": { "call": "http.get", "args": { "url": "https://host.com/api1", }, "result": "api_response1", }, }, { "step_2": { "assign": [ {"varA": "Monday"}, {"varB": "Tuesday"}, ] } }, ] } where each element of the list can be the output of `Step.definition` of some step. """ validate_label_key(name) self.name = name self._content = content
@property def definition(self) -> dict: return {self.name: self._content}
[docs] class BatchStep(Step): """ Running a Batch job using Workflows. See https://atamel.dev/posts/2023/05-30_workflows_batch_connector/ https://cloud.google.com/workflows/docs/reference/googleapis/batch/Overview https://cloud.google.com/workflows/docs/sleeping """
[docs] def __init__( self, name: str, config: BatchJobConfig, *, delete_batch_job: bool = True, ): validate_label_key(name) job_config = json.loads(type(config.job).to_json(config.job)) job_id = name.replace('_', '-') parent = f'projects/{get_project_id()}/locations/{config.region}' result_name = name.replace('-', '_') + '_result' validate_identifier_name(result_name) steps = [] steps.append( { 'log_create': { 'call': 'sys.log', 'args': {'data': f'creating and running the batch job {job_id}'}, } } ) steps.append( { 'create_job': { 'call': 'googleapis.batch.v1.projects.locations.jobs.create', 'args': { 'parent': parent, 'jobId': job_id, 'body': job_config, }, 'result': result_name, # This "result" seems to be the entire batch-job config, and not the "result" of the job's run. } } ) # This uses Workflow's batch "connector" to create and run the batch job, waiting for its completion. steps.append( { 'log_create_result': { 'call': 'sys.log', 'args': { 'data': f'${{"result of batch job {job_id}: \n" + str({result_name})}}' }, } } ) if delete_batch_job: # TODO: how to delete only on batch success? steps.append( { 'log_delete': { 'call': 'sys.log', 'args': {'data': f'deleting the batch job {job_id}'}, } } ) steps.append( { 'delete_job': { 'call': 'googleapis.batch.v1.projects.locations.jobs.delete', 'args': { 'name': f'{parent}/jobs/{job_id}', }, } } ) content = {'steps': steps} # `job_id`` requirement: ^[a-z]([a-z0-9-]{0,61}[a-z0-9])?$ Note in particular: doesn't allow underscore. # `result` name must be a valid variable (or identifier) name, e.g. it can't contain dash. # Experiments suggested that `job_id` and `result` name do not have fixed relation with the step `name`; # I made changes to both and it still worked. super().__init__(name, content) self.job_url = f'https://batch.googleapis.com/v1/{parent}/jobs/{job_id}' self.job_id = job_id # If you keep the batch job, then this might be the ID to use for tracking. self.result_name = result_name self.region = config.region
[docs] class WorkflowConfig:
[docs] def __init__(self, steps: Sequence[Step]): """ If your workflow requires command-line arguments, you should access individual arguments using `dot` notation, for example, "args.name", "args.age". Correspondingly in :meth:`Workflow.execute`, you need to pass a dict to `args`, e.g. `{'name': 'Tom', 'age': 38}`. The 'params' is provided whether your job needs it. If not needed, don't provide args in :meth:`Workflow.execute` and use use it in the workflow "steps". """ self._content = { 'params': ['args'], 'steps': [s.definition for s in steps], } self._workflow = workflows_v1.Workflow( source_contents=json.dumps(self.definition) )
@property def definition(self) -> dict: return {'main': self._content} @property def workflow(self) -> workflows_v1.Workflow: return self._workflow
[docs] class Workflow:
[docs] @classmethod def create( cls, name: str, config: WorkflowConfig, *, region: str, ) -> Workflow: """ `name` needs to be unique, hence it's recommended to construct it with some randomness. If you create a workflow for a, say, batch job, then you probably should get `region` from the batch job definition. I don't know whether it's allowed for a workflow to contain jobs spanning regions. """ validate_label_key(name) req = workflows_v1.CreateWorkflowRequest( parent=f'projects/{get_project_id()}/locations/{region}', workflow=config.workflow, workflow_id=name, ) op = _call_workflow_client('create_workflow', req) resp = op.result() return cls(resp)
[docs] @classmethod def list(cls, region: str) -> list[Workflow]: req = workflows_v1.ListWorkflowsRequest( parent=f'projects/{get_project_id()}/locations/{region}' ) resp = _call_workflow_client('list_workflows', req) return [cls(r) for r in resp]
[docs] def __init__(self, name_or_obj: str | workflows_v1.Workflow, /): """ `name` is like "projects/<project_id>/locations/<region>/workflows/<name>". """ if isinstance(name_or_obj, str): self.name = name_or_obj self.workflow = None self._refresh() else: self.name = name_or_obj.name self.workflow = name_or_obj
def __repr__(self): return f"{self.__class__.__name__}('{self.name}')" def __str__(self): return self.__repr__() def _refresh(self): req = workflows_v1.GetWorkflowRequest(name=self.name) self.workflow = _call_workflow_client('get_workflow', req) return self @property def region(self) -> str: return self.name.split('locations/')[1].split('/')[0] @property def definition(self) -> dict: return json.loads(self.workflow.source_contents) @property def create_time(self) -> datetime.datetime: return self.workflow.create_time @property def update_time(self) -> datetime.datetime: return self._refresh().workflow.update_time @property def revision_id(self) -> str: return self._refresh().workflow.revision_id
[docs] def state(self) -> Literal['STATE_UNSPECIFIED', 'ACTIVE', 'UNAVAILABLE']: return self._refresh().workflow.state.name
[docs] def update(self, config: WorkflowConfig): self.workflow.source_contents = json.dumps(config.definition) req = workflows_v1.UpdateWorkflowRequest(workflow=self.workflow) op = _call_workflow_client('update_workflow', req) self.workflow = op.result() return self
[docs] def execute(self, args: dict | None = None) -> Execution: if args: exe = executions_v1.Execution(argument=json.dumps(args)) else: exe = executions_v1.Execution() req = executions_v1.CreateExecutionRequest(parent=self.name, execution=exe) resp = _call_execution_client('create_execution', req) self._refresh() return Execution(resp)
[docs] def delete(self) -> None: req = workflows_v1.DeleteWorkflowRequest(name=self.name) _call_workflow_client('delete_workflow', req) self.workflow = None
[docs] def list_executions(self) -> list[Execution]: req = executions_v1.ListExecutionsRequest(parent=self.name) resp = _call_execution_client('list_executions', req) return [Execution(r) for r in resp]
[docs] class Execution:
[docs] def __init__(self, name_or_obj: str | executions_v1.Execution): if isinstance(name_or_obj, str): self.name = name_or_obj self.execution = None self._refresh() else: self.name = name_or_obj.name self.execution = name_or_obj
def __repr__(self): return f"{self.__class__.__name__}('{self.name}')" def __str__(self): return self.__repr__() def _refresh(self): req = executions_v1.GetExecutionRequest(name=self.name) self.execution = _call_execution_client('get_execution', req) return self @property def start_time(self) -> datetime.datetime: return self.execution.start_time @property def end_time(self) -> datetime.datetime: # If not finished (`state()` returns "ACTIVE"), this returns `None`. return self._refresh().execution.end_time @property def argument(self): return self.execution.argument
[docs] def result(self): return str(self._refresh().execution.result)
[docs] def status(self): # If still running, this returns which step is currently running. return self._refresh().execution.status
[docs] def state( self, ) -> Literal[ 'STATE_UNSPECIFIED', 'ACTIVE', 'SUCCEEDED', 'FAILED', 'CANCELLED', 'UNAVAILABLE', 'QUEUED', ]: return self._refresh().execution.state.name
[docs] def cancel(self): req = executions_v1.CancelExecutionRequest(name=self.name) _call_execution_client('cancel_execution', req) self._refresh()